#!/usr/bin/env python3 """DLit — Download It! a tiny terminal wrapper for yt-dlp. Paste a link at the prompt; downloads queue and run serially in the background. Commands: :mp3, :video, :none switch the active preset. Ctrl-D or :q to quit (lets an in-flight download finish; Ctrl-C aborts). v1.8: link resolution runs off the main thread so the prompt never freezes on a slow probe; tracked lines that scroll past the top of the screen are untracked instead of corrupting the display; job bookkeeping is pruned rather than growing forever; playlist truncation at PLAYLIST_LIMIT is reported; quitting drains the queue but finishes the download in flight. """ from __future__ import annotations import itertools import os import queue import re import readline # noqa: F401 (importing enables line editing in input()) import shutil import subprocess import sys import tempfile import threading import time from pathlib import Path import yt_dlp from yt_dlp.postprocessor import EmbedThumbnailPP from yt_dlp.postprocessor.common import PostProcessor VERSION = "1.8" DOWNLOAD_DIR = Path.home() / "Downloads" PROMPT = "dlit> " # A pasted link is first *resolved* (a quick, flat metadata fetch) to decide # whether it is one video or a playlist. A playlist fans out into one queued # job per entry, each labelled "(PlaylistName - i/N)". PLAYLIST_LIMIT bounds how # many entries a single paste may expand into, so a pathological or effectively # endless feed (e.g. youtube.com's recommendations) can't flood the queue. PLAYLIST_LIMIT = 200 SOCKET_TIMEOUT = 30 # seconds; bounds network stalls during resolve + download PRESETS: dict[str, dict] = { "mp3": { "format": "bestaudio/best", "writethumbnail": True, "postprocessors": [ {"key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": "320"}, {"key": "FFmpegMetadata"}, # EmbedThumbnail is registered manually in worker() so that # SquareThumbnailCropper can run between metadata-write and embed. ], "postprocessor_args": { "metadata": [ "-metadata", f"comment=DLit {VERSION} (Conway)", "-metadata", "genre=", ], }, }, "video": {"format": "bestvideo+bestaudio/best"}, "none": {}, } class _QuietLogger: """Swallow yt-dlp's own console output. yt-dlp writes errors/warnings straight to stderr even under quiet=True (see YoutubeDL.to_stderr). Those bytes bypass our cursor save/restore line management and land on top of the `dlit>` prompt, corrupting the display — most visibly on an invalid URL, which emits an "ERROR: ..." line. Routing everything through a logger keeps all output under our control; the failure itself still surfaces via the DownloadError that worker() catches. """ def debug(self, msg: str) -> None: ... def info(self, msg: str) -> None: ... def warning(self, msg: str) -> None: ... def error(self, msg: str) -> None: ... _QUIET_LOGGER = _QuietLogger() _ids = itertools.count(1) # job ids; next() on a count is GIL-atomic, so both # the main thread and the resolver may draw from it _shutting_down = False # set on quit: insert_lines stops redrawing the prompt _print_lock = threading.Lock() _line_distance: dict[int, int] = {} # job_id -> rows above the current cursor position _job_title: dict[int, str] = {} # job_id -> resolved title (once known) _job_tag: dict[int, str] = {} # job_id -> "(PlaylistName - i/N) " prefix, "" for singles _RESET = "\x1b[0m" def _icon(sgr: str, ch: str) -> str: return f"\x1b[{sgr}m{ch}{_RESET}" ICON_QUEUED = _icon("35", "~") # purple ICON_DOWNLOADING = _icon("33", "↓") # yellow ICON_CONVERTING = _icon("34", "↻") # blue ICON_DONE = _icon("32", "✓") # green ICON_FAILED = _icon("31", "✗") # red _SGR_RE = re.compile(r"\x1b\[[0-9;]*m") def _clamp(text: str) -> str: """Trim a status line to the terminal width, counting visible columns only. Job lines are rewritten in place by relying on each one occupying exactly one terminal row (see update_job_line). A line wider than the terminal wraps onto the row below — the prompt — and the cursor save/restore math no longer lands where it should, corrupting the display. Clamping keeps every line to a single row. SGR colour codes don't occupy columns, so they're copied through without counting toward the budget. """ width = shutil.get_terminal_size((80, 24)).columns - 1 if width <= 0: return text out: list[str] = [] visible = i = 0 while i < len(text): m = _SGR_RE.match(text, i) if m: out.append(m.group()) i = m.end() continue if visible >= width: return "".join(out) + _RESET # reset in case we cut mid-colour out.append(text[i]) visible += 1 i += 1 return "".join(out) def _bump(n: int) -> None: """Move every tracked line n rows farther from the cursor. Caller must hold _print_lock. A line pushed past the top of the screen can no longer be addressed — cursor-up (CSI A) clamps at row 1, so a later rewrite would land on whatever unrelated line sits at the top. Such lines are untracked here, which also keeps the dicts bounded by the screen height instead of growing for the life of the session. """ height = shutil.get_terminal_size((80, 24)).lines for k in list(_line_distance): d = _line_distance[k] + n if d >= height: del _line_distance[k] else: _line_distance[k] = d def post_line(text: str, job_id: int | None = None) -> None: """Main thread, between prompts: print a new line above the next prompt; tracked when a job_id is given. The just-submitted input line stays on screen above the printed line, so existing tracked lines end up 2 rows farther from the next prompt. """ with _print_lock: _bump(2) if job_id is not None: _line_distance[job_id] = 1 sys.stdout.write(_clamp(text) + "\n") sys.stdout.flush() def insert_lines(items: list[tuple[int | None, str]]) -> None: """Any thread, while the prompt is live: push new lines above it; entries with a job id become tracked, None entries are one-off notices. The prompt row is cleared, the new lines printed where it stood, and the prompt plus whatever the user had typed (readline's buffer) redrawn below, so existing tracked lines move exactly n rows. Known limits: input long enough to wrap onto a second row throws the geometry off by the wrapped rows, and the redraw parks the visible cursor at the end of the typed text even if the user had moved it left (readline's own state is unaffected). """ with _print_lock: n = len(items) _bump(n) sys.stdout.write("\r\x1b[K") for offset, (job_id, text) in enumerate(items): if job_id is not None: _line_distance[job_id] = n - offset sys.stdout.write(_clamp(text) + "\n") if not _shutting_down: sys.stdout.write(PROMPT + readline.get_line_buffer()) sys.stdout.flush() def update_job_line(job_id: int, text: str) -> None: """Worker/resolver thread: rewrite this job's tracked line in place; cursor returns to its prior spot. Untracked (pruned or finished) jobs are a no-op.""" with _print_lock: d = _line_distance.get(job_id) if d is None: return if d >= shutil.get_terminal_size((80, 24)).lines: # Scrolled off since the last _bump (e.g. the terminal shrank); # unreachable by cursor-up, so stop tracking it. del _line_distance[job_id] return sys.stdout.write( "\x1b7" # save cursor (DECSC) f"\x1b[{d}A" # up d rows "\r\x1b[K" # clear that line f"{_clamp(text)}" "\x1b8" # restore cursor (DECRC) ) sys.stdout.flush() def _forget_job(job_id: int) -> None: """Drop a job's bookkeeping once its line is final and will never be rewritten again.""" with _print_lock: _line_distance.pop(job_id, None) _job_title.pop(job_id, None) _job_tag.pop(job_id, None) def make_opts(job_id: int, preset: str) -> dict: last = {"pct": -5.0, "t": 0.0} def progress(d: dict) -> None: info = d.get("info_dict") or {} title = info.get("title") if title: _job_title[job_id] = title status = d.get("status") tag = _job_tag.get(job_id, "") shown_title = tag + _job_title.get(job_id, "…")[:60] if status == "downloading": total = d.get("total_bytes") or d.get("total_bytes_estimate") or 0 done = d.get("downloaded_bytes") or 0 pct = (done * 100.0 / total) if total else 0.0 now = time.monotonic() if pct - last["pct"] >= 5 or now - last["t"] >= 2.0: last["pct"], last["t"] = pct, now speed = d.get("speed") or 0 speed_s = f"{speed / 1_000_000:4.1f} MB/s" if speed else " - MB/s" update_job_line( job_id, f"[{ICON_DOWNLOADING}] {shown_title} {pct:5.1f}% {speed_s}", ) elif status == "finished" and preset == "mp3": update_job_line(job_id, f"[{ICON_CONVERTING}] {shown_title}") return { "outtmpl": str(DOWNLOAD_DIR / "%(title)s.%(ext)s"), "noplaylist": True, "quiet": True, "no_warnings": True, "no_color": True, "noprogress": True, "logger": _QUIET_LOGGER, "socket_timeout": SOCKET_TIMEOUT, "progress_hooks": [progress], **PRESETS[preset], } def resolve_opts() -> dict: """Options for the quick pre-download probe that classifies a pasted link. extract_flat keeps it cheap: playlist entries come back as bare references (id/title/url) without fetching each video. noplaylist=True preserves the download behaviour — a video that merely sits inside a playlist (watch?v=…&list=…) resolves to the single video; only a pure playlist or feed URL fans out. playlistend bounds how far an endless feed is walked. """ return { "quiet": True, "no_warnings": True, "no_color": True, "noplaylist": True, "logger": _QUIET_LOGGER, "socket_timeout": SOCKET_TIMEOUT, "extract_flat": "in_playlist", "playlistend": PLAYLIST_LIMIT, } def resolve(url: str) -> tuple[bool, str, list[tuple[str, str]]]: """Classify a pasted link. Returns (is_playlist, name, [(entry_url, title)]). A single video comes back as (False, title, [(url, title)]). A playlist or feed comes back as (True, playlist_name, [...]) — possibly empty, e.g. a logged-out recommendations feed, in which case nothing is queued. """ with yt_dlp.YoutubeDL(resolve_opts()) as ydl: info = ydl.extract_info(url, download=False) or {} entries = info.get("entries") if entries is None: title = info.get("title") or url return False, title, [(info.get("webpage_url") or url, title)] name = info.get("title") or "playlist" items = [] for e in entries: if not e: continue eurl = e.get("url") or e.get("webpage_url") or e.get("id") if eurl: items.append((eurl, e.get("title") or eurl)) return True, name, items class SquareThumbnailCropper(PostProcessor): """Detect a square cover image padded into a 16:9 YouTube thumbnail and crop the bars off before EmbedThumbnail runs. Works on bars of any solid color (black, white, the album's accent color, etc.) by sampling the leftmost/rightmost columns and walking inward until the column stops matching the edge color. Guarded by aspect ratio: if the detected content region isn't roughly square the original is left untouched, so YouTube videos with natively 16:9 thumbnails or artwork with uniform color edges that fool the sampler just pass through uncropped. """ SQUARE_TOLERANCE = 0.05 # accept detected aspect ratios within ±5% of 1:1 COLOR_TOLERANCE = 12 # per-channel RGB deviation a column may have from edge def run(self, info): path = self._thumbnail_path(info) if not path or not os.path.exists(path): return [], info crop = self._detect(path) if not crop: return [], info w, h, x, y = crop if h <= 0 or abs(w / h - 1.0) > self.SQUARE_TOLERANCE: return [], info self._apply(path, w, h, x, y) return [], info @staticmethod def _thumbnail_path(info: dict): for t in reversed(info.get("thumbnails") or []): fp = t.get("filepath") if fp: return fp return None @classmethod def _detect(cls, path: str): dims = cls._probe_dimensions(path) if not dims: return None w, h = dims buf = cls._dump_pixels(path, w, h) if buf is None: return None sample_ys = [h * i // 6 for i in range(1, 6)] tol = cls.COLOR_TOLERANCE def col_uniform(x: int, ref: tuple) -> bool: for y in sample_ys: i = (y * w + x) * 3 if (abs(buf[i] - ref[0]) > tol or abs(buf[i + 1] - ref[1]) > tol or abs(buf[i + 2] - ref[2]) > tol): return False return True def pix(x: int, y: int) -> tuple: i = (y * w + x) * 3 return buf[i], buf[i + 1], buf[i + 2] left_ref = pix(0, h // 2) left_w = 0 for x in range(w // 2): if not col_uniform(x, left_ref): break left_w = x + 1 right_ref = pix(w - 1, h // 2) right_w = 0 for x in range(w - 1, w // 2 - 1, -1): if not col_uniform(x, right_ref): break right_w += 1 content_w = w - left_w - right_w if content_w <= 0: return None return content_w, h, left_w, 0 @staticmethod def _probe_dimensions(path: str): try: res = subprocess.run( ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0", path], capture_output=True, text=True, timeout=10, ) except (FileNotFoundError, subprocess.TimeoutExpired): return None if res.returncode != 0: return None parts = res.stdout.strip().split(",") if len(parts) != 2: return None try: w, h = int(parts[0]), int(parts[1]) except ValueError: return None return (w, h) if w > 0 and h > 0 else None @staticmethod def _dump_pixels(path: str, w: int, h: int): try: res = subprocess.run( ["ffmpeg", "-hide_banner", "-loglevel", "error", "-i", path, "-f", "rawvideo", "-pix_fmt", "rgb24", "-"], capture_output=True, timeout=15, ) except (FileNotFoundError, subprocess.TimeoutExpired): return None if res.returncode != 0 or len(res.stdout) != w * h * 3: return None return res.stdout @staticmethod def _apply(path: str, w: int, h: int, x: int, y: int) -> None: suffix = os.path.splitext(path)[1] or ".jpg" # dir= keeps the temp file on the same filesystem as the target so the # final os.replace() is an atomic rename, not a cross-device error. fd, tmp = tempfile.mkstemp(suffix=suffix, dir=os.path.dirname(path) or ".") os.close(fd) try: subprocess.run( ["ffmpeg", "-hide_banner", "-loglevel", "error", "-y", "-i", path, "-vf", f"crop={w}:{h}:{x}:{y}", tmp], check=True, capture_output=True, timeout=15, ) os.replace(tmp, path) except (subprocess.CalledProcessError, subprocess.TimeoutExpired): try: os.unlink(tmp) except OSError: pass def resolver(resolves: "queue.Queue", jobs: "queue.Queue") -> None: """Resolve pasted links off the main thread, then queue download jobs. The placeholder line printed at paste time is reused in place for a single video (or a playlist's first entry); further playlist entries are inserted above the live prompt as new tracked lines. Lines are registered before their jobs are enqueued, so a fast worker never tries to update a line whose distance isn't set yet. """ while True: item = resolves.get() if item is None: return job_id, url, preset = item try: is_playlist, name, entries = resolve(url) except Exception as exc: # noqa: BLE001 err = str(exc).splitlines()[0][:60] update_job_line(job_id, f"[{ICON_FAILED}] {url[:60]} {err}") _forget_job(job_id) continue if not entries: update_job_line(job_id, f"[{ICON_FAILED}] nothing to download in '{name[:40]}'") _forget_job(job_id) continue total = len(entries) ids = [job_id] + [next(_ids) for _ in range(total - 1)] for i, (jid, (_eurl, title)) in enumerate(zip(ids, entries), 1): _job_title[jid] = title if is_playlist: _job_tag[jid] = f"({name[:24]} - {i}/{total}) " update_job_line(job_id, f"[{ICON_QUEUED}] {_job_tag.get(job_id, '')}{entries[0][1]}") extra: list[tuple[int | None, str]] = [ (jid, f"[{ICON_QUEUED}] {_job_tag.get(jid, '')}{title}") for jid, (_eurl, title) in zip(ids[1:], entries[1:]) ] if total >= PLAYLIST_LIMIT: # yt-dlp gives no overflow signal under playlistend, so hitting the # limit exactly is the best available proxy for truncation. extra.append((None, f" '{name[:40]}' stopped at {PLAYLIST_LIMIT} entries")) if extra: insert_lines(extra) for jid, (eurl, _title) in zip(ids, entries): jobs.put((jid, eurl, preset)) def worker(jobs: "queue.Queue") -> None: while True: item = jobs.get() if item is None: return job_id, url, preset = item tag = _job_tag.get(job_id, "") try: with yt_dlp.YoutubeDL(make_opts(job_id, preset)) as ydl: if preset == "mp3": ydl.add_post_processor(SquareThumbnailCropper(ydl), when="post_process") ydl.add_post_processor(EmbedThumbnailPP(ydl), when="post_process") info = ydl.extract_info(url, download=True) title = (info or {}).get("title") or _job_title.get(job_id) or url update_job_line(job_id, f"[{ICON_DONE}] {tag}{title[:60]}") except Exception as exc: # noqa: BLE001 err = str(exc).splitlines()[0][:60] title = _job_title.get(job_id) or url update_job_line(job_id, f"[{ICON_FAILED}] {tag}{title[:60]} {err}") _forget_job(job_id) def _drain(q: "queue.Queue") -> None: try: while True: q.get_nowait() except queue.Empty: pass def main() -> None: global _shutting_down DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) resolves: "queue.Queue" = queue.Queue() jobs: "queue.Queue" = queue.Queue() threading.Thread(target=resolver, args=(resolves, jobs), daemon=True).start() worker_t = threading.Thread(target=worker, args=(jobs,), daemon=True) worker_t.start() preset = "mp3" print(f"dlit v.{VERSION} -> {DOWNLOAD_DIR} preset={preset} (:q to quit)") while True: try: line = input(PROMPT).strip() except KeyboardInterrupt: # Hard abort: don't wait for in-flight work (daemon threads die). print() return except EOFError: print() break if not line: # The empty submit still pushed the prompt down one row. with _print_lock: _bump(1) continue if line.startswith(":"): cmd = line[1:] if cmd in ("q", "quit", "exit"): break if cmd in PRESETS: preset = cmd post_line(f" preset -> {preset}") else: post_line(f" unknown command: :{cmd} (try :mp3, :video, :none, :q)") continue # A pasted link: hand it to the resolver thread so the prompt stays # live; the placeholder line is updated (and fanned out) in place. job_id = next(_ids) post_line(f"[{ICON_QUEUED}] resolving… {line}", job_id) resolves.put((job_id, line, preset)) # Graceful quit (:q / Ctrl-D): discard everything still waiting, but let # an in-flight download finish so no half-written file is left behind. _shutting_down = True with _print_lock: _bump(1) # the :q / Ctrl-D submit pushed the cursor down one row _drain(resolves) resolves.put(None) _drain(jobs) jobs.put(None) worker_t.join(timeout=0.2) if worker_t.is_alive(): insert_lines([(None, " finishing current download — Ctrl-C to abort")]) try: worker_t.join() except KeyboardInterrupt: print() if __name__ == "__main__": main()