diff options
| author | Conway <[email protected]> | 2026-06-10 16:16:34 -0400 |
|---|---|---|
| committer | Conway <[email protected]> | 2026-06-10 16:16:34 -0400 |
| commit | b23aced316e19d96bc71bec76fa5e91d10dfbee7 (patch) | |
| tree | d48ab67cdc2468639b50a5df5861fe53fcddbbc9 | |
Initial commit: dlit v.1.5 — terminal yt-dlp wrapper
Queued yt-dlp downloads with mp3/video/none presets, ANSI status icons,
ID3 tagging, and a postprocessor that crops solid-color bars off
square album art before embedding the cover.
| -rw-r--r-- | .gitignore | 4 | ||||
| -rw-r--r-- | app.py | 329 | ||||
| -rw-r--r-- | dlit.md | 38 | ||||
| -rw-r--r-- | requirements.txt | 2 |
4 files changed, 373 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f959714 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__/ +*.py[cod] +.venv/ +.claude/ @@ -0,0 +1,329 @@ +#!/usr/bin/env python3 +"""dlit — a tiny terminal wrapper for yt-dlp. + +Paste a link at the prompt; downloads queue and run serially in the +background. Commands: :mp3, :video, :none switch the active preset. +Ctrl-D or :q to quit. +""" + +from __future__ import annotations + +import os +import queue +import readline # noqa: F401 (importing enables line editing in input()) +import subprocess +import sys +import tempfile +import threading +import time +from pathlib import Path + +import yt_dlp +from yt_dlp.postprocessor import EmbedThumbnailPP +from yt_dlp.postprocessor.common import PostProcessor + +VERSION = "1.5" +DOWNLOAD_DIR = Path.home() / "Downloads" +PROMPT = "dlit> " + +PRESETS: dict[str, dict] = { + "mp3": { + "format": "bestaudio/best", + "writethumbnail": True, + "postprocessors": [ + {"key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "320"}, + {"key": "FFmpegMetadata"}, + # EmbedThumbnail is registered manually in worker() so that + # SquareThumbnailCropper can run between metadata-write and embed. + ], + "postprocessor_args": { + "metadata": [ + "-metadata", f"comment=DLit {VERSION} (Conway)", + "-metadata", "genre=", + ], + }, + }, + "video": {"format": "bestvideo+bestaudio/best"}, + "none": {}, +} + +_print_lock = threading.Lock() +_line_distance: dict[int, int] = {} # job_id -> rows above the current cursor position +_job_title: dict[int, str] = {} # job_id -> resolved title (once known) + +_RESET = "\x1b[0m" +ICON_QUEUED = f"\x1b[35m~{_RESET}" # purple +ICON_DOWNLOADING = f"\x1b[33m↓{_RESET}" # yellow +ICON_CONVERTING = f"\x1b[34m↻{_RESET}" # blue +ICON_DONE = f"\x1b[32m✓{_RESET}" # green +ICON_FAILED = f"\x1b[31m✗{_RESET}" # red + + +def add_job_line(job_id: int, text: str) -> None: + """Main thread: print a new tracked line above the next prompt.""" + with _print_lock: + # Existing tracked lines move up by 2 rows: the just-submitted prompt line + # stays on screen, and the new tracked line we're about to print is one more. + for k in _line_distance: + _line_distance[k] += 2 + _line_distance[job_id] = 1 + sys.stdout.write(text + "\n") + sys.stdout.flush() + + +def update_job_line(job_id: int, text: str) -> None: + """Worker thread: rewrite this job's tracked line in place; cursor returns to its prior spot.""" + with _print_lock: + d = _line_distance.get(job_id) + if d is None: + return + sys.stdout.write( + "\x1b7" # save cursor (DECSC) + f"\x1b[{d}A" # up d rows + "\r\x1b[K" # clear that line + f"{text}" + "\x1b8" # restore cursor (DECRC) + ) + sys.stdout.flush() + + +def make_opts(job_id: int, preset: str) -> dict: + last = {"pct": -5.0, "t": 0.0} + + def progress(d: dict) -> None: + info = d.get("info_dict") or {} + title = info.get("title") + if title: + _job_title[job_id] = title + status = d.get("status") + shown_title = _job_title.get(job_id, "…")[:60] + if status == "downloading": + total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 + done = d.get("downloaded_bytes") or 0 + pct = (done * 100.0 / total) if total else 0.0 + now = time.monotonic() + if pct - last["pct"] >= 5 or now - last["t"] >= 2.0: + last["pct"], last["t"] = pct, now + speed = d.get("speed") or 0 + speed_s = f"{speed / 1_000_000:4.1f} MB/s" if speed else " - MB/s" + update_job_line( + job_id, + f"[{ICON_DOWNLOADING}] {shown_title} {pct:5.1f}% {speed_s}", + ) + elif status == "finished" and preset == "mp3": + update_job_line(job_id, f"[{ICON_CONVERTING}] {shown_title}") + + return { + "outtmpl": str(DOWNLOAD_DIR / "%(title)s.%(ext)s"), + "noplaylist": True, + "quiet": True, + "no_warnings": True, + "no_color": True, + "noprogress": True, + "progress_hooks": [progress], + **PRESETS[preset], + } + + +class SquareThumbnailCropper(PostProcessor): + """Detect a square cover image padded into a 16:9 YouTube thumbnail and crop + the bars off before EmbedThumbnail runs. + + Works on bars of any solid color (black, white, the album's accent color, + etc.) by sampling the leftmost/rightmost columns and walking inward until + the column stops matching the edge color. Guarded by aspect ratio: if the + detected content region isn't roughly square the original is left untouched, + so YouTube videos with natively 16:9 thumbnails or artwork with uniform + color edges that fool the sampler just pass through uncropped. + """ + + SQUARE_TOLERANCE = 0.05 # accept detected aspect ratios within ±5% of 1:1 + COLOR_TOLERANCE = 12 # per-channel RGB deviation a column may have from edge + + def run(self, info): + path = self._thumbnail_path(info) + if not path or not os.path.exists(path): + return [], info + crop = self._detect(path) + if not crop: + return [], info + w, h, x, y = crop + if h <= 0 or abs(w / h - 1.0) > self.SQUARE_TOLERANCE: + return [], info + self._apply(path, w, h, x, y) + return [], info + + @staticmethod + def _thumbnail_path(info: dict): + for t in reversed(info.get("thumbnails") or []): + fp = t.get("filepath") + if fp: + return fp + return None + + @classmethod + def _detect(cls, path: str): + dims = cls._probe_dimensions(path) + if not dims: + return None + w, h = dims + buf = cls._dump_pixels(path, w, h) + if buf is None: + return None + + sample_ys = [h * i // 6 for i in range(1, 6)] + tol = cls.COLOR_TOLERANCE + + def col_uniform(x: int, ref: tuple) -> bool: + for y in sample_ys: + i = (y * w + x) * 3 + if (abs(buf[i] - ref[0]) > tol + or abs(buf[i + 1] - ref[1]) > tol + or abs(buf[i + 2] - ref[2]) > tol): + return False + return True + + def pix(x: int, y: int) -> tuple: + i = (y * w + x) * 3 + return buf[i], buf[i + 1], buf[i + 2] + + left_ref = pix(0, h // 2) + left_w = 0 + for x in range(w // 2): + if not col_uniform(x, left_ref): + break + left_w = x + 1 + + right_ref = pix(w - 1, h // 2) + right_w = 0 + for x in range(w - 1, w // 2, -1): + if not col_uniform(x, right_ref): + break + right_w += 1 + + content_w = w - left_w - right_w + if content_w <= 0: + return None + return content_w, h, left_w, 0 + + @staticmethod + def _probe_dimensions(path: str): + try: + res = subprocess.run( + ["ffprobe", "-v", "error", "-select_streams", "v:0", + "-show_entries", "stream=width,height", + "-of", "csv=p=0", path], + capture_output=True, text=True, timeout=10, + ) + except (FileNotFoundError, subprocess.TimeoutExpired): + return None + if res.returncode != 0: + return None + parts = res.stdout.strip().split(",") + if len(parts) != 2: + return None + try: + w, h = int(parts[0]), int(parts[1]) + except ValueError: + return None + return (w, h) if w > 0 and h > 0 else None + + @staticmethod + def _dump_pixels(path: str, w: int, h: int): + try: + res = subprocess.run( + ["ffmpeg", "-hide_banner", "-loglevel", "error", + "-i", path, "-f", "rawvideo", "-pix_fmt", "rgb24", "-"], + capture_output=True, timeout=15, + ) + except (FileNotFoundError, subprocess.TimeoutExpired): + return None + if res.returncode != 0 or len(res.stdout) != w * h * 3: + return None + return res.stdout + + @staticmethod + def _apply(path: str, w: int, h: int, x: int, y: int) -> None: + suffix = os.path.splitext(path)[1] or ".jpg" + # dir= keeps the temp file on the same filesystem as the target so the + # final os.replace() is an atomic rename, not a cross-device error. + fd, tmp = tempfile.mkstemp(suffix=suffix, dir=os.path.dirname(path) or ".") + os.close(fd) + try: + subprocess.run( + ["ffmpeg", "-hide_banner", "-loglevel", "error", "-y", + "-i", path, "-vf", f"crop={w}:{h}:{x}:{y}", tmp], + check=True, capture_output=True, timeout=15, + ) + os.replace(tmp, path) + except (subprocess.CalledProcessError, subprocess.TimeoutExpired): + try: + os.unlink(tmp) + except OSError: + pass + + +def worker(jobs: "queue.Queue") -> None: + while True: + item = jobs.get() + if item is None: + return + job_id, url, preset = item + try: + with yt_dlp.YoutubeDL(make_opts(job_id, preset)) as ydl: + if preset == "mp3": + ydl.add_post_processor(SquareThumbnailCropper(ydl), when="post_process") + ydl.add_post_processor(EmbedThumbnailPP(ydl), when="post_process") + info = ydl.extract_info(url, download=True) + title = (info or {}).get("title") or _job_title.get(job_id) or url + _job_title[job_id] = title + update_job_line(job_id, f"[{ICON_DONE}] {title[:60]}") + except Exception as exc: # noqa: BLE001 + err = str(exc).splitlines()[0][:60] + title = _job_title.get(job_id) or url + update_job_line(job_id, f"[{ICON_FAILED}] {title[:60]} {err}") + + +def main() -> None: + DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) + jobs: "queue.Queue" = queue.Queue() + threading.Thread(target=worker, args=(jobs,), daemon=True).start() + + preset = "mp3" + next_id = 1 + print(f"dlit v.{VERSION} -> {DOWNLOAD_DIR} preset={preset} (:q to quit)") + while True: + try: + line = input(PROMPT).strip() + except (EOFError, KeyboardInterrupt): + print() + break + if not line: + continue + if line.startswith(":"): + cmd = line[1:] + if cmd in ("q", "quit", "exit"): + break + if cmd in PRESETS: + preset = cmd + with _print_lock: + for k in _line_distance: + _line_distance[k] += 2 + sys.stdout.write(f" preset -> {preset}\n") + sys.stdout.flush() + else: + with _print_lock: + for k in _line_distance: + _line_distance[k] += 2 + sys.stdout.write( + f" unknown command: :{cmd} (try :mp3, :video, :none, :q)\n" + ) + sys.stdout.flush() + continue + jobs.put((next_id, line, preset)) + add_job_line(next_id, f"[{ICON_QUEUED}] {line}") + next_id += 1 + + +if __name__ == "__main__": + main() @@ -0,0 +1,38 @@ +DLIt - Download It! + +A python tui wrapper for yt-dlp. + +Instead of having to remember to type yt-dlp -x --audio-format mp3 --audio-quality 0 'https://youtu.be/DXvBWE782uw?si=vgGhooy5_X0BdAPr', or use the up arrow key until I find it in my zsh history, and then clear the link arg and replace it with the next video I want to download, I wanted to make a very simple python program. + +I just want highest-quality .mp3 files of my favorite songs, and I get them from youtube using dlp if they're not on bandcamp. + +I type dlit in a terminal to open the tui (assuming I have an alias already, and if not, python ~/Documents/Repositories/dlit/app.py) + +It should open a tui with a simple input. You paste the video link like so: + +https://youtu.be/-WsmHIJ2QJY?si=-lVVj4Q4gIzSvxbF + +and it should effectively run yt-dlp -x --audio-format mp3 --audio-quality 0 (the link you just posted) + +or some variation depending on how we make this thing. + +It should download songs to Downloads. By default yt-dlp downloads to ~. + +Later on we could do a settings menu like btop has, where we press escape to see a settings block where we can configure the download folder, yt-dlp flags, etc. We'll get around to that eventually. + +Most importantly, DLIt should allow us to queue up youtube links. Typically using yt-dlp from the terminal you have to download one video at a time- our program should let us add to an ongoing queue so that I can minimize the time it takes me to get all the links ready to be downloaded. + +The queue should appear in the TUI in a 0% to 100% [ ] sort-of download progress bar. + +once that's done we need to add this to my cgit + + +Features: + +Hmm, maybe a preset system. Like you can choose between "No Preset", ".mp3", "full video" + +the default being .mp3 + +if you change a setting it goes to No Preset + + diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..f7d5a99 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +textual>=0.80 +yt-dlp |
