diff options
| author | Conway <[email protected]> | 2026-06-10 16:16:34 -0400 |
|---|---|---|
| committer | Conway <[email protected]> | 2026-06-10 16:16:34 -0400 |
| commit | b23aced316e19d96bc71bec76fa5e91d10dfbee7 (patch) | |
| tree | d48ab67cdc2468639b50a5df5861fe53fcddbbc9 /app.py | |
Initial commit: dlit v.1.5 — terminal yt-dlp wrapper
Queued yt-dlp downloads with mp3/video/none presets, ANSI status icons,
ID3 tagging, and a postprocessor that crops solid-color bars off
square album art before embedding the cover.
Diffstat (limited to 'app.py')
| -rw-r--r-- | app.py | 329 |
1 files changed, 329 insertions, 0 deletions
@@ -0,0 +1,329 @@ +#!/usr/bin/env python3 +"""dlit — a tiny terminal wrapper for yt-dlp. + +Paste a link at the prompt; downloads queue and run serially in the +background. Commands: :mp3, :video, :none switch the active preset. +Ctrl-D or :q to quit. +""" + +from __future__ import annotations + +import os +import queue +import readline # noqa: F401 (importing enables line editing in input()) +import subprocess +import sys +import tempfile +import threading +import time +from pathlib import Path + +import yt_dlp +from yt_dlp.postprocessor import EmbedThumbnailPP +from yt_dlp.postprocessor.common import PostProcessor + +VERSION = "1.5" +DOWNLOAD_DIR = Path.home() / "Downloads" +PROMPT = "dlit> " + +PRESETS: dict[str, dict] = { + "mp3": { + "format": "bestaudio/best", + "writethumbnail": True, + "postprocessors": [ + {"key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "320"}, + {"key": "FFmpegMetadata"}, + # EmbedThumbnail is registered manually in worker() so that + # SquareThumbnailCropper can run between metadata-write and embed. + ], + "postprocessor_args": { + "metadata": [ + "-metadata", f"comment=DLit {VERSION} (Conway)", + "-metadata", "genre=", + ], + }, + }, + "video": {"format": "bestvideo+bestaudio/best"}, + "none": {}, +} + +_print_lock = threading.Lock() +_line_distance: dict[int, int] = {} # job_id -> rows above the current cursor position +_job_title: dict[int, str] = {} # job_id -> resolved title (once known) + +_RESET = "\x1b[0m" +ICON_QUEUED = f"\x1b[35m~{_RESET}" # purple +ICON_DOWNLOADING = f"\x1b[33m↓{_RESET}" # yellow +ICON_CONVERTING = f"\x1b[34m↻{_RESET}" # blue +ICON_DONE = f"\x1b[32m✓{_RESET}" # green +ICON_FAILED = f"\x1b[31m✗{_RESET}" # red + + +def add_job_line(job_id: int, text: str) -> None: + """Main thread: print a new tracked line above the next prompt.""" + with _print_lock: + # Existing tracked lines move up by 2 rows: the just-submitted prompt line + # stays on screen, and the new tracked line we're about to print is one more. + for k in _line_distance: + _line_distance[k] += 2 + _line_distance[job_id] = 1 + sys.stdout.write(text + "\n") + sys.stdout.flush() + + +def update_job_line(job_id: int, text: str) -> None: + """Worker thread: rewrite this job's tracked line in place; cursor returns to its prior spot.""" + with _print_lock: + d = _line_distance.get(job_id) + if d is None: + return + sys.stdout.write( + "\x1b7" # save cursor (DECSC) + f"\x1b[{d}A" # up d rows + "\r\x1b[K" # clear that line + f"{text}" + "\x1b8" # restore cursor (DECRC) + ) + sys.stdout.flush() + + +def make_opts(job_id: int, preset: str) -> dict: + last = {"pct": -5.0, "t": 0.0} + + def progress(d: dict) -> None: + info = d.get("info_dict") or {} + title = info.get("title") + if title: + _job_title[job_id] = title + status = d.get("status") + shown_title = _job_title.get(job_id, "…")[:60] + if status == "downloading": + total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 + done = d.get("downloaded_bytes") or 0 + pct = (done * 100.0 / total) if total else 0.0 + now = time.monotonic() + if pct - last["pct"] >= 5 or now - last["t"] >= 2.0: + last["pct"], last["t"] = pct, now + speed = d.get("speed") or 0 + speed_s = f"{speed / 1_000_000:4.1f} MB/s" if speed else " - MB/s" + update_job_line( + job_id, + f"[{ICON_DOWNLOADING}] {shown_title} {pct:5.1f}% {speed_s}", + ) + elif status == "finished" and preset == "mp3": + update_job_line(job_id, f"[{ICON_CONVERTING}] {shown_title}") + + return { + "outtmpl": str(DOWNLOAD_DIR / "%(title)s.%(ext)s"), + "noplaylist": True, + "quiet": True, + "no_warnings": True, + "no_color": True, + "noprogress": True, + "progress_hooks": [progress], + **PRESETS[preset], + } + + +class SquareThumbnailCropper(PostProcessor): + """Detect a square cover image padded into a 16:9 YouTube thumbnail and crop + the bars off before EmbedThumbnail runs. + + Works on bars of any solid color (black, white, the album's accent color, + etc.) by sampling the leftmost/rightmost columns and walking inward until + the column stops matching the edge color. Guarded by aspect ratio: if the + detected content region isn't roughly square the original is left untouched, + so YouTube videos with natively 16:9 thumbnails or artwork with uniform + color edges that fool the sampler just pass through uncropped. + """ + + SQUARE_TOLERANCE = 0.05 # accept detected aspect ratios within ±5% of 1:1 + COLOR_TOLERANCE = 12 # per-channel RGB deviation a column may have from edge + + def run(self, info): + path = self._thumbnail_path(info) + if not path or not os.path.exists(path): + return [], info + crop = self._detect(path) + if not crop: + return [], info + w, h, x, y = crop + if h <= 0 or abs(w / h - 1.0) > self.SQUARE_TOLERANCE: + return [], info + self._apply(path, w, h, x, y) + return [], info + + @staticmethod + def _thumbnail_path(info: dict): + for t in reversed(info.get("thumbnails") or []): + fp = t.get("filepath") + if fp: + return fp + return None + + @classmethod + def _detect(cls, path: str): + dims = cls._probe_dimensions(path) + if not dims: + return None + w, h = dims + buf = cls._dump_pixels(path, w, h) + if buf is None: + return None + + sample_ys = [h * i // 6 for i in range(1, 6)] + tol = cls.COLOR_TOLERANCE + + def col_uniform(x: int, ref: tuple) -> bool: + for y in sample_ys: + i = (y * w + x) * 3 + if (abs(buf[i] - ref[0]) > tol + or abs(buf[i + 1] - ref[1]) > tol + or abs(buf[i + 2] - ref[2]) > tol): + return False + return True + + def pix(x: int, y: int) -> tuple: + i = (y * w + x) * 3 + return buf[i], buf[i + 1], buf[i + 2] + + left_ref = pix(0, h // 2) + left_w = 0 + for x in range(w // 2): + if not col_uniform(x, left_ref): + break + left_w = x + 1 + + right_ref = pix(w - 1, h // 2) + right_w = 0 + for x in range(w - 1, w // 2, -1): + if not col_uniform(x, right_ref): + break + right_w += 1 + + content_w = w - left_w - right_w + if content_w <= 0: + return None + return content_w, h, left_w, 0 + + @staticmethod + def _probe_dimensions(path: str): + try: + res = subprocess.run( + ["ffprobe", "-v", "error", "-select_streams", "v:0", + "-show_entries", "stream=width,height", + "-of", "csv=p=0", path], + capture_output=True, text=True, timeout=10, + ) + except (FileNotFoundError, subprocess.TimeoutExpired): + return None + if res.returncode != 0: + return None + parts = res.stdout.strip().split(",") + if len(parts) != 2: + return None + try: + w, h = int(parts[0]), int(parts[1]) + except ValueError: + return None + return (w, h) if w > 0 and h > 0 else None + + @staticmethod + def _dump_pixels(path: str, w: int, h: int): + try: + res = subprocess.run( + ["ffmpeg", "-hide_banner", "-loglevel", "error", + "-i", path, "-f", "rawvideo", "-pix_fmt", "rgb24", "-"], + capture_output=True, timeout=15, + ) + except (FileNotFoundError, subprocess.TimeoutExpired): + return None + if res.returncode != 0 or len(res.stdout) != w * h * 3: + return None + return res.stdout + + @staticmethod + def _apply(path: str, w: int, h: int, x: int, y: int) -> None: + suffix = os.path.splitext(path)[1] or ".jpg" + # dir= keeps the temp file on the same filesystem as the target so the + # final os.replace() is an atomic rename, not a cross-device error. + fd, tmp = tempfile.mkstemp(suffix=suffix, dir=os.path.dirname(path) or ".") + os.close(fd) + try: + subprocess.run( + ["ffmpeg", "-hide_banner", "-loglevel", "error", "-y", + "-i", path, "-vf", f"crop={w}:{h}:{x}:{y}", tmp], + check=True, capture_output=True, timeout=15, + ) + os.replace(tmp, path) + except (subprocess.CalledProcessError, subprocess.TimeoutExpired): + try: + os.unlink(tmp) + except OSError: + pass + + +def worker(jobs: "queue.Queue") -> None: + while True: + item = jobs.get() + if item is None: + return + job_id, url, preset = item + try: + with yt_dlp.YoutubeDL(make_opts(job_id, preset)) as ydl: + if preset == "mp3": + ydl.add_post_processor(SquareThumbnailCropper(ydl), when="post_process") + ydl.add_post_processor(EmbedThumbnailPP(ydl), when="post_process") + info = ydl.extract_info(url, download=True) + title = (info or {}).get("title") or _job_title.get(job_id) or url + _job_title[job_id] = title + update_job_line(job_id, f"[{ICON_DONE}] {title[:60]}") + except Exception as exc: # noqa: BLE001 + err = str(exc).splitlines()[0][:60] + title = _job_title.get(job_id) or url + update_job_line(job_id, f"[{ICON_FAILED}] {title[:60]} {err}") + + +def main() -> None: + DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) + jobs: "queue.Queue" = queue.Queue() + threading.Thread(target=worker, args=(jobs,), daemon=True).start() + + preset = "mp3" + next_id = 1 + print(f"dlit v.{VERSION} -> {DOWNLOAD_DIR} preset={preset} (:q to quit)") + while True: + try: + line = input(PROMPT).strip() + except (EOFError, KeyboardInterrupt): + print() + break + if not line: + continue + if line.startswith(":"): + cmd = line[1:] + if cmd in ("q", "quit", "exit"): + break + if cmd in PRESETS: + preset = cmd + with _print_lock: + for k in _line_distance: + _line_distance[k] += 2 + sys.stdout.write(f" preset -> {preset}\n") + sys.stdout.flush() + else: + with _print_lock: + for k in _line_distance: + _line_distance[k] += 2 + sys.stdout.write( + f" unknown command: :{cmd} (try :mp3, :video, :none, :q)\n" + ) + sys.stdout.flush() + continue + jobs.put((next_id, line, preset)) + add_job_line(next_id, f"[{ICON_QUEUED}] {line}") + next_id += 1 + + +if __name__ == "__main__": + main() |
