Clean Torrent Downloader — No Bloat, No Bundled Malware, Just Downloads Update 11.03.2026
A one-script torrent client built from scratch. No uTorrent adware. No qBittorrent config rabbit holes. Just pick a file and go.
Most torrent clients install toolbars, crypto miners, or “recommended software” you never asked for. This one is a single Python script — open source, no installer, no background processes, no surprises.
Think of it as a remote control for aria2 — the fastest download engine that already lives on your machine (or installs in one command). The GUI just makes it human-friendly.
🖥️ What It Looks Like
Waiting for a torrent — clean, simple interface:
Downloading at 9.9 MB/s in Aggressive mode — 82 peers connected:
Three speed modes to match your connection:
⚡ Why This Instead of uTorrent / qBittorrent / BiglyBT
| Problem With Others | This Tool |
|---|---|
| Bundled adware / toolbars | Zero installs — one .py file, nothing else |
| Background processes eating RAM | Runs only when you open it, dies when you close it |
| Confusing settings pages | 3 speed modes: Safe, Balanced, Aggressive — pick one |
| Forced updates / premium nags | Open source — no accounts, no subscriptions, no nagging |
| Crypto miners hiding in installers | Readable source code — see exactly what runs |
🛠️ How to Use It (3 Steps)
Step 1 — Install the engine (one time only)
| OS | Command |
|---|---|
| Windows | choco install aria2 or download from aria2 GitHub releases |
| Linux | sudo apt install aria2 |
| Mac | brew install aria2 |
Also need PyQt6: pip install PyQt6
Step 2 — Run the script
Save the code below as torrent.py, then:
python torrent.py
Step 3 — Download
Click Select .torrent → pick your .torrent file → choose a speed mode → watch it fly.
Resume support built in. Close the app mid-download, reopen it, load the same torrent — it picks up where you left off. Delete the status files in your Downloads folder to start fresh.
📋 The Full Source Code
#!/usr/bin/env python3
"""Torrent downloader GUI powered by PyQt6 + aria2 RPC."""
from __future__ import annotations
import base64
import json
import os
import shutil
import socket
import subprocess
import sys
import time
import urllib.error
import urllib.request
import webbrowser
from concurrent.futures import Future, ThreadPoolExecutor
from pathlib import Path
from typing import Dict, Optional
from PyQt6 import QtCore, QtWidgets
FORCE_UNLIMITED_PEER_SPEED = False
DEFAULT_RPC_HOST = "127.0.0.1"
PREFERRED_PORTS = (6800, 6801, 6802, 6803, 6880, 6881, 6969, 8080, 8081, 5000, 8888, 9090, 10000, 16800)
RPC_PROBE_TIMEOUT_S = 0.2
STATUS_TIMEOUT_S = 1.8
RPC_FAST_TIMEOUT_S = 1.2
TURBO_MAINTENANCE_INTERVAL_S = 45.0
TURBO_FAIL_BACKOFF_S = 90.0
MODES: Dict[str, Dict[str, str]] = {
"🐢 Safe": {
"max-connection-per-server": "4",
"split": "4",
"bt-max-peers": "70",
"bt-request-peer-speed-limit": "64K",
},
"⚖️ Balanced": {
"max-connection-per-server": "16",
"split": "16",
"bt-max-peers": "220",
"bt-request-peer-speed-limit": "32K",
},
"🔥 Aggressive": {
"max-connection-per-server": "16",
"split": "32",
"bt-max-peers": "600",
"bt-request-peer-speed-limit": "0",
},
}
TRACKERS = [
"udp://tracker.opentrackr.org:1337/announce",
"udp://open.demonii.com:1337/announce",
"udp://tracker.torrent.eu.org:451/announce",
"udp://tracker.coppersurfer.tk:6969/announce",
"udp://tracker.openbittorrent.com:6969/announce",
"udp://tracker.zer0day.to:1337/announce",
"udp://tracker.cyberia.is:6969/announce",
"udp://explodie.org:6969/announce",
"udp://tracker.moeking.me:6969/announce",
"udp://9.rarbg.com:2710/announce",
"udp://tracker.dler.org:6969/announce",
"udp://tracker1.bt.moack.co.kr:80/announce",
"udp://tracker.srv00.com:6969/announce",
]
def resolve_download_dir() -> Path:
default = Path(os.environ.get("USERPROFILE", "")).expanduser() / "Downloads"
if str(default).strip() in {"", "Downloads"}:
default = Path.home() / "Downloads"
default.mkdir(parents=True, exist_ok=True)
probe = default / ".write_test.tmp"
probe.write_text("ok", encoding="utf-8")
probe.unlink(missing_ok=True)
return default
def rpc_url(port: int) -> str:
return f"http://{DEFAULT_RPC_HOST}:{port}/jsonrpc"
def aria2_request(
method: str,
params: Optional[list] = None,
rpc_endpoint: str = rpc_url(6800),
timeout_s: float = 8.0,
) -> dict:
payload = {
"jsonrpc": "2.0",
"id": "req",
"method": f"aria2.{method}",
"params": params or [],
}
req = urllib.request.Request(
rpc_endpoint,
data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
data = json.loads(resp.read())
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"HTTP {exc.code} {exc.reason}\n{body}") from None
except urllib.error.URLError as exc:
raise RuntimeError(f"RPC unavailable ({rpc_endpoint}): {exc.reason}") from None
if "error" in data:
message = data["error"].get("message", "Unknown error")
raise RuntimeError(message)
return data
def aria2_raw_request(
method: str,
params: Optional[list] = None,
rpc_endpoint: str = rpc_url(6800),
timeout_s: float = 8.0,
) -> dict:
payload = {
"jsonrpc": "2.0",
"id": "req",
"method": method,
"params": params or [],
}
req = urllib.request.Request(
rpc_endpoint,
data=json.dumps(payload).encode("utf-8"),
headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
data = json.loads(resp.read())
except urllib.error.HTTPError as exc:
body = exc.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"HTTP {exc.code} {exc.reason}\n{body}") from None
except urllib.error.URLError as exc:
raise RuntimeError(f"RPC unavailable ({rpc_endpoint}): {exc.reason}") from None
if "error" in data:
message = data["error"].get("message", "Unknown error")
raise RuntimeError(message)
return data
def human_bytes(value: str | int) -> str:
n = float(int(value))
for unit in ["B", "KB", "MB", "GB", "TB"]:
if n < 1024:
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} PB"
def pick_free_port(start: int = 6800, stop: int = 6999) -> int:
for port in range(start, stop + 1):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if sock.connect_ex((DEFAULT_RPC_HOST, port)) != 0:
return port
raise RuntimeError("Unable to find a free RPC port (6800-6999).")
class TorrentWindow(QtWidgets.QWidget):
def __init__(self) -> None:
super().__init__()
self.download_dir = resolve_download_dir()
self.aria2_proc: Optional[subprocess.Popen] = None
self.owns_aria2_process = False
self.gid_current: Optional[str] = None
self.is_paused = False
self.rpc_port = 6800
self.rpc_endpoint = rpc_url(self.rpc_port)
self.executor = ThreadPoolExecutor(max_workers=3)
self.add_in_flight = False
self.progress_request_in_flight = False
self.add_future: Optional[Future] = None
self.status_future: Optional[Future] = None
self.status_tick = 0
self.status_timeout_streak = 0
self.last_known_state = ""
self.last_completed_length = 0
self.last_speed_ts = time.time()
self.speed_ema = 0.0
self.no_seed_streak = 0
self.turbo_sticky_enabled = False
self.turbo_last_apply_ts = 0.0
self.turbo_apply_in_flight = False
self.turbo_apply_future: Optional[Future] = None
self.turbo_apply_show_popup = False
self.turbo_fail_streak = 0
self.turbo_backoff_until = 0.0
self.turbo_last_profile = ""
self.turbo_last_maintenance_ts = 0.0
self.peer_discovery_future: Optional[Future] = None
self.startup_future: Optional[Future] = None
self.reconnect_future: Optional[Future] = None
self.last_status_line = ""
self.last_speed_line = ""
self.setWindowTitle("📥 Torrent Downloader (PyQt6)")
self.setFixedSize(620, 450)
self.status_label = QtWidgets.QLabel("Waiting for .torrent...")
self.rpc_label = QtWidgets.QLabel("RPC: checking...")
self.progress = QtWidgets.QProgressBar()
self.progress.setRange(0, 100)
self.speed_label = QtWidgets.QLabel("0 B/s")
self.file_label = QtWidgets.QLabel("")
self.mode_box = QtWidgets.QComboBox()
self.mode_box.addItems(MODES.keys())
self.mode_box.setCurrentText("⚖️ Balanced")
self.select_button = QtWidgets.QPushButton("📂 Select .torrent")
self.pause_button = QtWidgets.QPushButton("▶️ Start")
self.open_button = QtWidgets.QPushButton("📁 Open folder")
self.tune_button = QtWidgets.QPushButton("⚡ Apply turbo settings")
self.reconnect_button = QtWidgets.QPushButton("🔌 Reconnect RPC")
self.hints = QtWidgets.QPlainTextEdit()
self.hints.setReadOnly(True)
self.hints.setMaximumHeight(160)
layout = QtWidgets.QVBoxLayout(self)
layout.addWidget(self.status_label)
layout.addWidget(self.rpc_label)
layout.addWidget(self.progress)
layout.addWidget(self.speed_label)
layout.addWidget(self.file_label)
row = QtWidgets.QHBoxLayout()
row.addWidget(QtWidgets.QLabel("⚙️ Speed mode:"))
row.addWidget(self.mode_box)
layout.addLayout(row)
buttons = QtWidgets.QHBoxLayout()
buttons.addWidget(self.select_button)
buttons.addWidget(self.pause_button)
buttons.addWidget(self.open_button)
layout.addLayout(buttons)
rpc_buttons = QtWidgets.QHBoxLayout()
rpc_buttons.addWidget(self.tune_button)
rpc_buttons.addWidget(self.reconnect_button)
layout.addLayout(rpc_buttons)
layout.addWidget(QtWidgets.QLabel(f"📁 Downloading to: {self.download_dir}"))
layout.addWidget(QtWidgets.QLabel("Speed diagnostics:"))
layout.addWidget(self.hints)
self.timer = QtCore.QTimer(self)
self.timer.setInterval(700)
self.timer.timeout.connect(self.update_progress)
self.add_timer = QtCore.QTimer(self)
self.add_timer.setInterval(60)
self.add_timer.timeout.connect(self._poll_add_future)
self.turbo_timer = QtCore.QTimer(self)
self.turbo_timer.setInterval(20000)
self.turbo_timer.timeout.connect(self._maintain_turbo)
self.turbo_poll_timer = QtCore.QTimer(self)
self.turbo_poll_timer.setInterval(120)
self.turbo_poll_timer.timeout.connect(self._poll_turbo_future)
self.startup_timer = QtCore.QTimer(self)
self.startup_timer.setInterval(80)
self.startup_timer.timeout.connect(self._poll_startup_future)
self.reconnect_timer = QtCore.QTimer(self)
self.reconnect_timer.setInterval(100)
self.reconnect_timer.timeout.connect(self._poll_reconnect_future)
self.select_button.clicked.connect(self.choose_torrent)
self.pause_button.clicked.connect(self.start_pause_resume)
self.open_button.clicked.connect(lambda: webbrowser.open(str(self.download_dir)))
self.tune_button.clicked.connect(self.apply_turbo)
self.reconnect_button.clicked.connect(self.reconnect_rpc)
self._set_rpc_controls_ready(False)
self._start_rpc_bootstrap_async()
self.refresh_speed_hints()
def _set_rpc_controls_ready(self, ready: bool) -> None:
self.select_button.setEnabled(ready and not self.add_in_flight)
self.pause_button.setEnabled(ready and not self.add_in_flight)
self.tune_button.setEnabled(ready)
self.reconnect_button.setEnabled(not self.add_in_flight)
def _start_rpc_bootstrap_async(self) -> None:
if self.startup_future and not self.startup_future.done():
return
self.rpc_label.setText("RPC: connecting asynchronously...")
self.startup_future = self.executor.submit(self.start_or_connect_aria2, False)
if not self.startup_timer.isActive():
self.startup_timer.start()
def _poll_startup_future(self) -> None:
if not self.startup_future:
self.startup_timer.stop()
return
if not self.startup_future.done():
return
fut = self.startup_future
self.startup_future = None
self.startup_timer.stop()
try:
fut.result()
self._set_rpc_controls_ready(True)
self.rpc_label.setText(f"RPC: connected on port {self.rpc_port}")
self.status_label.setText("Ready")
except Exception as exc:
self._set_rpc_controls_ready(False)
self.rpc_label.setText("RPC: unavailable")
self.status_label.setText(f"RPC startup failed: {exc}")
def rpc_call(self, method: str, params: Optional[list] = None) -> dict:
return aria2_request(method, params, self.rpc_endpoint)
def rpc_call_fast(self, method: str, params: Optional[list] = None, timeout_s: float = RPC_FAST_TIMEOUT_S) -> dict:
return aria2_request(method, params, self.rpc_endpoint, timeout_s=timeout_s)
def _connect_existing_rpc(self) -> bool:
for port in PREFERRED_PORTS:
endpoint = rpc_url(port)
try:
aria2_request("getVersion", rpc_endpoint=endpoint, timeout_s=RPC_PROBE_TIMEOUT_S)
self.rpc_endpoint = endpoint
self.rpc_port = port
self.owns_aria2_process = False
return True
except Exception:
continue
return False
def _stop_owned_aria2(self) -> None:
if self.owns_aria2_process and self.aria2_proc and self.aria2_proc.poll() is None:
self.aria2_proc.terminate()
self.aria2_proc = None
self.owns_aria2_process = False
def _start_local_aria2(self, preferred_ports: Optional[list[int]] = None) -> None:
if not shutil.which("aria2c"):
raise RuntimeError(
"aria2c was not found in PATH. Install aria2 and try again.\n"
"Windows: choco install aria2\n"
"Ubuntu/Debian: sudo apt install aria2"
)
candidate_ports = list(preferred_ports or PREFERRED_PORTS)
extra_port = pick_free_port()
if extra_port not in candidate_ports:
candidate_ports.append(extra_port)
last_error = ""
profiles = [
self._compatibility_profile_turbo,
self._compatibility_profile_balanced,
self._compatibility_profile_minimal,
]
for builder in profiles:
for port in candidate_ports:
try:
self._start_aria2_on_port(port, builder(port))
return
except Exception as exc:
last_error = str(exc)
continue
raise RuntimeError(
"Failed to start local aria2 RPC on all candidate ports.\n"
f"Last error: {last_error}"
)
def start_or_connect_aria2(
self,
prefer_local: bool = True,
force_restart_local: bool = False,
preferred_ports: Optional[list[int]] = None,
) -> None:
"""Ensure a reliable RPC endpoint.
By default we prefer a dedicated local aria2 process to avoid unstable external RPC sessions.
"""
if force_restart_local:
self._stop_owned_aria2()
if prefer_local:
try:
self._start_local_aria2(preferred_ports=preferred_ports)
return
except Exception:
# Fall back to existing RPC only when local start is impossible.
pass
if self._connect_existing_rpc():
return
# Final fallback: attempt local again so caller gets detailed startup errors.
self._start_local_aria2(preferred_ports=preferred_ports)
def _compatibility_profile_minimal(self, port: int) -> list[str]:
return [
"aria2c",
"--enable-rpc",
f"--rpc-listen-port={port}",
"--rpc-listen-all=false",
"--rpc-allow-origin-all",
"--rpc-max-request-size=32M",
f"--dir={self.download_dir}",
"--file-allocation=none",
]
def _compatibility_profile_balanced(self, port: int) -> list[str]:
return self._compatibility_profile_minimal(port) + [
"--rpc-max-request-size=64M",
"--allow-overwrite=true",
"--continue=true",
"--max-concurrent-downloads=5",
"--bt-enable-lpd=true",
"--enable-dht=true",
"--enable-dht6=false",
"--bt-enable-peer-exchange=true",
"--seed-time=0",
"--max-overall-download-limit=0",
"--max-overall-upload-limit=0",
f"--bt-tracker={','.join(TRACKERS)}",
]
def _compatibility_profile_turbo(self, port: int) -> list[str]:
return self._compatibility_profile_balanced(port) + [
"--bt-detach-seed-only=true",
"--optimize-concurrent-downloads=true",
]
def _start_aria2_on_port(self, port: int, aria2_args: list[str]) -> None:
proc = subprocess.Popen(aria2_args, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True)
endpoint = rpc_url(port)
for _ in range(8):
if proc.poll() is not None:
stderr_text = ""
if proc.stderr is not None:
stderr_text = proc.stderr.read().strip()
extra = f" | {stderr_text[:200]}" if stderr_text else ""
raise RuntimeError(
f"aria2 exited immediately after start (port {port}, code {proc.returncode}){extra}"
)
try:
aria2_request("getVersion", rpc_endpoint=endpoint, timeout_s=RPC_PROBE_TIMEOUT_S)
self.aria2_proc = proc
self.owns_aria2_process = True
self.rpc_port = port
self.rpc_endpoint = endpoint
return
except Exception:
time.sleep(0.05)
proc.terminate()
raise RuntimeError(f"aria2 RPC is not responding on port {port}")
def _reconnect_rpc_worker(self) -> tuple[bool, str]:
try:
self.start_or_connect_aria2(prefer_local=False, force_restart_local=False)
if not self.owns_aria2_process:
self.start_or_connect_aria2(prefer_local=True, force_restart_local=True, preferred_ports=list(PREFERRED_PORTS))
return True, self.rpc_endpoint
except Exception as exc:
return False, str(exc)
def reconnect_rpc(self) -> None:
if self.reconnect_future and not self.reconnect_future.done():
return
self.reconnect_button.setEnabled(False)
self.rpc_label.setText("RPC: reconnecting...")
self.reconnect_future = self.executor.submit(self._reconnect_rpc_worker)
if not self.reconnect_timer.isActive():
self.reconnect_timer.start()
def _poll_reconnect_future(self) -> None:
if not self.reconnect_future:
self.reconnect_timer.stop()
return
if not self.reconnect_future.done():
return
fut = self.reconnect_future
self.reconnect_future = None
self.reconnect_timer.stop()
self.reconnect_button.setEnabled(True)
ok, payload = fut.result()
if ok:
QtWidgets.QMessageBox.information(self, "RPC", f"Connected to {payload}")
self.rpc_label.setText(f"RPC: connected to {payload}")
self._set_rpc_controls_ready(True)
else:
self.rpc_label.setText("RPC: reconnect failed")
QtWidgets.QMessageBox.critical(self, "RPC", payload)
def _looks_like_rpc_connection_drop(self, error_text: str) -> bool:
lowered = error_text.lower()
markers = (
"rpc unavailable",
"connection reset",
"connection aborted",
"connection refused",
"timed out",
"10053",
"10054",
"10061",
)
return any(marker in lowered for marker in markers)
def _add_torrent_with_fallback(self, torrent_path: Path, b64_payload: str, options: Dict[str, str]) -> str:
# Reliable strategy: addTorrent only, with forced local RPC restart and port rotation.
# addUri(file://...) is intentionally NOT used because aria2 may reject local file URIs.
base_ports = [self.rpc_port] + [p for p in PREFERRED_PORTS if p != self.rpc_port]
# Fast-first strategy to avoid long add delays.
attempts = [base_ports[:4], base_ports[4:8] or base_ports[:4], base_ports]
last_error = ""
for ports in attempts:
try:
return str(self.rpc_call("addTorrent", [b64_payload, [], options])["result"])
except Exception as first_exc:
last_error = str(first_exc)
# If connection dropped OR remote aria2 became unhealthy, restart locally on rotated ports.
if self._looks_like_rpc_connection_drop(last_error) or "HTTP 5" in last_error:
try:
self.start_or_connect_aria2(
prefer_local=True,
force_restart_local=True,
preferred_ports=list(ports),
)
return str(self.rpc_call("addTorrent", [b64_payload, [], options])["result"])
except Exception as retry_exc:
last_error = str(retry_exc)
continue
else:
# Non-transport error, return immediately (e.g. invalid torrent data).
raise RuntimeError(last_error) from None
raise RuntimeError(
"Unable to add torrent after multiple local RPC restarts and port rotations.\n"
f"File: {torrent_path}\n"
f"Last error: {last_error}"
)
def build_options(self, mode: str) -> Dict[str, str]:
options = {
"dir": str(self.download_dir),
"allow-overwrite": "true",
"auto-file-renaming": "false",
"follow-torrent": "true",
"enable-dht": "true",
"bt-enable-lpd": "true",
"bt-enable-peer-exchange": "true",
"seed-time": "0",
"max-upload-limit": "0",
"max-download-limit": "0",
"min-split-size": "1M",
"bt-tracker": ",".join(TRACKERS),
}
mode_params = dict(MODES.get(mode, {}))
if mode == "🔥 Aggressive" and not FORCE_UNLIMITED_PEER_SPEED:
mode_params.pop("bt-request-peer-speed-limit", None)
max_conn = min(16, max(1, int(mode_params.get("max-connection-per-server", "16"))))
split = min(32, max(1, int(mode_params.get("split", "16"))))
peers = max(50, int(mode_params.get("bt-max-peers", "150")))
options.update(mode_params)
options.update(
{
"max-connection-per-server": str(max_conn),
"split": str(split),
"bt-max-peers": str(peers),
}
)
return options
def _set_add_busy(self, busy: bool) -> None:
self.add_in_flight = busy
self.select_button.setEnabled((not busy) and (self.startup_future is None))
self.pause_button.setEnabled((not busy) and (self.startup_future is None))
self.reconnect_button.setEnabled(not busy)
if busy:
self.status_label.setText("Adding torrent...")
def _extract_infohash(self, text: str) -> Optional[str]:
import re
match = re.search(r"([0-9a-fA-F]{40})", text)
if not match:
return None
return match.group(1).lower()
def _find_existing_gid_by_infohash(self, infohash: str) -> Optional[str]:
targets = [
("tellActive", []),
("tellWaiting", [0, 20]),
("tellStopped", [0, 20]),
]
keys = ["gid", "infoHash", "status"]
for method, base in targets:
try:
res = self.rpc_call(method, base + [keys])["result"]
except Exception:
continue
if isinstance(res, dict):
res = [res]
for item in res:
if str(item.get("infoHash", "")).lower() == infohash:
return str(item.get("gid", "")) or None
return None
def _handle_duplicate_torrent_error(self, error_message: str) -> bool:
infohash = self._extract_infohash(error_message)
if not infohash:
return False
gid = self._find_existing_gid_by_infohash(infohash)
if not gid:
return False
self.gid_current = gid
self.is_paused = False
self.pause_button.setText("⏸ Pause")
self.timer.start()
self.status_label.setText("Torrent already registered, attached to existing task")
QtWidgets.QMessageBox.information(
self,
"Already added",
"This torrent is already in aria2. Attached to existing download task.",
)
return True
def _add_torrent_worker(self, torrent_path: Path, mode: str) -> tuple[bool, str, str]:
try:
raw = torrent_path.read_bytes()
b64 = base64.b64encode(raw).decode("utf-8")
options = self.build_options(mode)
gid = self._add_torrent_with_fallback(torrent_path, b64, options)
return True, gid, mode
except Exception as exc:
return False, str(exc), mode
def _on_add_torrent_finished(self, future: Future) -> None:
self._set_add_busy(False)
ok, payload, mode = future.result()
if ok:
self.gid_current = payload
self.is_paused = False
self.last_completed_length = 0
self.last_speed_ts = time.time()
self.speed_ema = 0.0
self.no_seed_streak = 0
self.pause_button.setText("⏸ Pause")
self.status_label.setText(f"Mode '{mode}' started")
self.timer.start()
if self.turbo_sticky_enabled:
profile, global_opts, task_opts = self._turbo_profile(speed_bps=0, peers=0, seeders=0)
self._start_turbo_apply(show_popup=False, profile=profile, global_opts=global_opts, task_opts=task_opts)
if not self.turbo_timer.isActive():
self.turbo_timer.start()
return
error_text = payload
if "already registered" in error_text.lower() and self._handle_duplicate_torrent_error(error_text):
return
QtWidgets.QMessageBox.critical(
self,
"Add error",
(
"Failed to add torrent after all automatic recovery attempts.\n\n"
f"Details: {error_text}"
),
)
def _poll_add_future(self) -> None:
if not self.add_future:
self.add_timer.stop()
return
if not self.add_future.done():
return
fut = self.add_future
self.add_future = None
self.add_timer.stop()
self._on_add_torrent_finished(fut)
def _peer_discovery_worker(self, gid: str) -> None:
try:
aria2_request(
"changeOption",
[
gid,
{
"bt-tracker": ",".join(TRACKERS),
"bt-max-peers": "800",
"bt-request-peer-speed-limit": "0",
"bt-enable-peer-exchange": "true",
},
],
rpc_endpoint=self.rpc_endpoint,
timeout_s=1.5,
)
except Exception:
pass
def _fetch_status_worker(self, gid: str, include_files: bool = False, include_peer_stats: bool = False) -> tuple[bool, object]:
keys = [
"status",
"completedLength",
"totalLength",
"downloadSpeed",
"numSeeders",
"connections",
"errorCode",
"errorMessage",
]
if include_files:
keys.append("files")
for _ in range(2):
try:
if include_peer_stats:
calls = [
{"methodName": "aria2.tellStatus", "params": [gid, keys]},
{"methodName": "aria2.getPeers", "params": [gid]},
]
multi = aria2_raw_request("system.multicall", [calls], rpc_endpoint=self.rpc_endpoint, timeout_s=STATUS_TIMEOUT_S)["result"]
status = multi[0][0] if isinstance(multi, list) and multi and isinstance(multi[0], list) and multi[0] else {}
peers = multi[1][0] if isinstance(multi, list) and len(multi) > 1 and isinstance(multi[1], list) and multi[1] else []
else:
status = aria2_request(
"tellStatus",
[gid, keys],
rpc_endpoint=self.rpc_endpoint,
timeout_s=STATUS_TIMEOUT_S,
)["result"]
peers = []
status["peerCount"] = len(peers) if isinstance(peers, list) else 0
if isinstance(peers, list):
status["peerSeederCount"] = sum(1 for p in peers if str(p.get("seeder", "")).lower() in {"true", "1"})
else:
status["peerSeederCount"] = 0
return True, status
except Exception as exc:
last = str(exc)
if "timed out" in last.lower():
time.sleep(0.05)
continue
return False, last
return False, "timed out"
def _on_status_fetched(self, future: Future) -> None:
ok, payload = future.result()
if not ok:
if "timed out" in str(payload).lower():
self.status_timeout_streak += 1
if self.status_timeout_streak >= 6:
self.status_label.setText("RPC is slow, retrying...")
return
self.status_timeout_streak = 0
self.status_label.setText(f"Request error: {payload}")
return
self.status_timeout_streak = 0
status = payload
self.last_known_state = str(status.get("status", ""))
comp = int(status.get("completedLength", "0"))
total = max(1, int(status.get("totalLength", "1")))
reported_speed = int(status.get("downloadSpeed", "0"))
progress = min(100, int((comp / total) * 100))
now = time.time()
dt = max(0.001, now - self.last_speed_ts)
observed_speed = max(0.0, (comp - self.last_completed_length) / dt)
self.last_completed_length = comp
self.last_speed_ts = now
self.speed_ema = observed_speed if self.speed_ema <= 0 else (self.speed_ema * 0.7 + observed_speed * 0.3)
display_speed = int(self.speed_ema)
seeders_rpc = int(str(status.get("numSeeders", "0")) or "0")
seeders_peer = int(str(status.get("peerSeederCount", "0")) or "0")
seeders = max(seeders_rpc, seeders_peer)
peers = int(str(status.get("peerCount", status.get("connections", "0"))) or "0")
self.progress.setValue(progress)
speed_line = f"{human_bytes(display_speed)}/s (real) | peers: {peers} | seeders: {seeders}"
if speed_line != self.last_speed_line:
self.speed_label.setText(speed_line)
self.last_speed_line = speed_line
state = status.get("status", "")
# bounded peer discovery submissions to avoid worker queue flood/freeze
if self.peer_discovery_future and self.peer_discovery_future.done():
self.peer_discovery_future = None
if state == "active" and seeders == 0:
self.no_seed_streak += 1
if self.no_seed_streak % 12 == 0 and self.gid_current and self.peer_discovery_future is None:
self.peer_discovery_future = self.executor.submit(self._peer_discovery_worker, self.gid_current)
else:
self.no_seed_streak = 0
if self.turbo_sticky_enabled and state == "active":
self._schedule_turbo_maintenance(display_speed, peers, seeders)
status_line = f"{state} | {progress}% | raw: {human_bytes(reported_speed)}/s"
if status_line != self.last_status_line:
self.status_label.setText(status_line)
self.last_status_line = status_line
target_interval = 550 if state == "active" and display_speed >= 1024 * 1024 else 900
if self.timer.interval() != target_interval:
self.timer.setInterval(target_interval)
if status.get("files"):
file_name = Path(status["files"][0].get("path", "")).name
self.file_label.setText(f"📄 {file_name}")
if state == "complete":
self.timer.stop()
self.turbo_sticky_enabled = False
self.turbo_timer.stop()
self.turbo_poll_timer.stop()
self.gid_current = None
self.is_paused = False
self.pause_button.setText("▶️ Start")
QtWidgets.QMessageBox.information(self, "Done", f"File downloaded to\n{self.download_dir}")
webbrowser.open(str(self.download_dir))
return
if state == "error":
self.timer.stop()
self.turbo_sticky_enabled = False
self.turbo_timer.stop()
self.turbo_poll_timer.stop()
error_code = str(status.get("errorCode", ""))
error_message = str(status.get("errorMessage", "Unknown aria2 error."))
if error_code == "12" and self._handle_duplicate_torrent_error(error_message):
return
self.gid_current = None
self.is_paused = False
self.pause_button.setText("▶️ Start")
QtWidgets.QMessageBox.critical(
self,
"Download error",
(
"Download failed.\n\n"
f"aria2 code: {error_code or 'n/a'}\n"
f"aria2 message: {error_message}\n\n"
"Tips:\n"
"- Try another speed mode (Safe/Balanced).\n"
"- Verify torrent has active seeders.\n"
"- Try Reconnect RPC, then start again."
),
)
def choose_torrent(self) -> bool:
if self.add_in_flight:
return False
path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Select torrent", str(Path.home()), "Torrent (*.torrent)")
if not path:
return False
torrent_path = Path(path)
mode = self.mode_box.currentText()
self._set_add_busy(True)
self.add_future = self.executor.submit(self._add_torrent_worker, torrent_path, mode)
self.add_timer.start()
return True
def update_progress(self) -> None:
if not self.gid_current:
return
# consume completed status request
if self.status_future and self.status_future.done():
fut = self.status_future
self.status_future = None
self.progress_request_in_flight = False
self._on_status_fetched(fut)
return
# submit next status request when none is in flight
if not self.status_future and not self.progress_request_in_flight:
self.progress_request_in_flight = True
gid = self.gid_current
include_files = (self.status_tick % 6 == 0)
include_peer_stats = (self.status_tick % 10 == 0)
self.status_tick += 1
self.status_future = self.executor.submit(self._fetch_status_worker, gid, include_files, include_peer_stats)
def _fetch_gid_state(self) -> Optional[str]:
if not self.gid_current:
return None
try:
res = self.rpc_call_fast("tellStatus", [self.gid_current, ["status"]])["result"]
return str(res.get("status", ""))
except Exception:
return None
def start_pause_resume(self) -> None:
if not self.gid_current:
self.choose_torrent()
return
state = self.last_known_state or self._fetch_gid_state()
if state in {None, "", "error", "complete", "removed"}:
self.gid_current = None
self.is_paused = False
self.pause_button.setText("▶️ Start")
QtWidgets.QMessageBox.information(
self,
"No active download",
"Previous task is no longer active. Select a torrent to start a new download.",
)
return
try:
if state == "paused":
self.rpc_call("unpause", [self.gid_current])
self.pause_button.setText("⏸ Pause")
self.is_paused = False
elif state == "active":
self.rpc_call("pause", [self.gid_current])
self.pause_button.setText("▶️ Resume")
self.is_paused = True
elif state == "waiting":
# waiting often cannot be unpaused/paused yet; don't send invalid command
self.pause_button.setText("▶️ Resume")
self.is_paused = True
QtWidgets.QMessageBox.information(self, "Task state", "Task is waiting in queue. Pause/Resume is not available yet.")
else:
self.is_paused = (state == "paused")
QtWidgets.QMessageBox.information(self, "Task state", f"Current task state: {state}")
except Exception as exc:
QtWidgets.QMessageBox.critical(self, "Error", f"Failed to switch state:\n{exc}")
def _turbo_profile(self, speed_bps: int, peers: int, seeders: int) -> tuple[str, Dict[str, str], Dict[str, str]]:
global_opts = {
"max-overall-download-limit": "0",
"max-overall-upload-limit": "0",
"max-concurrent-downloads": "6",
"bt-max-open-files": "256",
"bt-tracker": ",".join(TRACKERS),
}
if peers < 8 or seeders == 0:
profile = "bootstrap"
task_opts = {
"max-connection-per-server": "12",
"split": "12",
"bt-max-peers": "320",
"bt-request-peer-speed-limit": "0",
"bt-enable-peer-exchange": "true",
"bt-tracker-connect-timeout": "6",
"bt-tracker-timeout": "6",
"min-split-size": "2M",
}
elif speed_bps > 8 * 1024 * 1024:
profile = "throughput"
task_opts = {
"max-connection-per-server": "16",
"split": "24",
"bt-max-peers": "500",
"bt-request-peer-speed-limit": "0",
"min-split-size": "4M",
}
else:
profile = "balanced"
task_opts = {
"max-connection-per-server": "14",
"split": "16",
"bt-max-peers": "380",
"bt-request-peer-speed-limit": "0",
"min-split-size": "2M",
}
task_opts.update(
{
"max-download-limit": "0",
"max-upload-limit": "0",
"bt-tracker": ",".join(TRACKERS),
}
)
return profile, global_opts, task_opts
def _apply_turbo_worker(self, gid: Optional[str], global_opts: Dict[str, str], task_opts: Dict[str, str]) -> tuple[bool, bool, list[str], str]:
try:
calls = [{"methodName": "aria2.changeGlobalOption", "params": [global_opts]}]
if gid:
calls.append({"methodName": "aria2.changeOption", "params": [gid, task_opts]})
try:
aria2_raw_request("system.multicall", [calls], rpc_endpoint=self.rpc_endpoint, timeout_s=1.8)
except Exception:
aria2_request("changeGlobalOption", [global_opts], rpc_endpoint=self.rpc_endpoint, timeout_s=1.8)
if gid:
aria2_request("changeOption", [gid, task_opts], rpc_endpoint=self.rpc_endpoint, timeout_s=1.8)
return True, bool(gid), [], ""
except Exception as exc:
return False, False, [], str(exc)
def _start_turbo_apply(self, show_popup: bool, profile: str, global_opts: Dict[str, str], task_opts: Dict[str, str]) -> None:
if self.turbo_apply_in_flight:
return
self.turbo_apply_in_flight = True
self.turbo_apply_show_popup = show_popup
gid = self.gid_current
self.turbo_apply_future = self.executor.submit(self._apply_turbo_worker, gid, global_opts, task_opts)
self.turbo_last_profile = profile
if not self.turbo_poll_timer.isActive():
self.turbo_poll_timer.start()
def _poll_turbo_future(self) -> None:
if not self.turbo_apply_future:
self.turbo_apply_in_flight = False
self.turbo_poll_timer.stop()
return
if not self.turbo_apply_future.done():
return
fut = self.turbo_apply_future
self.turbo_apply_future = None
self.turbo_apply_in_flight = False
self.turbo_poll_timer.stop()
ok, task_applied, skipped, err = fut.result()
if not ok:
self.turbo_fail_streak += 1
self.turbo_backoff_until = time.time() + TURBO_FAIL_BACKOFF_S
if self.turbo_apply_show_popup:
QtWidgets.QMessageBox.critical(self, "Error", err)
return
self.turbo_fail_streak = 0
self.turbo_backoff_until = 0.0
self.turbo_last_apply_ts = time.time()
self.turbo_last_maintenance_ts = self.turbo_last_apply_ts
if task_applied:
self.status_label.setText(f"Turbo active ({self.turbo_last_profile})")
else:
self.status_label.setText("Turbo active globally (waiting for task)")
if self.turbo_apply_show_popup:
QtWidgets.QMessageBox.information(
self,
"Turbo",
"Smart Turbo enabled: lightweight adaptive profile with low RPC overhead.",
)
def _schedule_turbo_maintenance(self, speed_bps: int, peers: int, seeders: int) -> None:
now = time.time()
if self.turbo_apply_in_flight:
return
if now < self.turbo_backoff_until:
return
profile, global_opts, task_opts = self._turbo_profile(speed_bps, peers, seeders)
profile_changed = profile != self.turbo_last_profile
periodic_due = (now - self.turbo_last_maintenance_ts) >= TURBO_MAINTENANCE_INTERVAL_S
if profile_changed or periodic_due:
self._start_turbo_apply(show_popup=False, profile=profile, global_opts=global_opts, task_opts=task_opts)
def _maintain_turbo(self) -> None:
if not self.turbo_sticky_enabled:
return
if not self.gid_current:
return
state = self.last_known_state
if state in {"complete", "error", "removed"}:
self.turbo_sticky_enabled = False
self.turbo_timer.stop()
return
def apply_turbo(self) -> None:
self.turbo_sticky_enabled = True
profile, global_opts, task_opts = self._turbo_profile(speed_bps=0, peers=0, seeders=0)
self._start_turbo_apply(show_popup=True, profile=profile, global_opts=global_opts, task_opts=task_opts)
if not self.turbo_timer.isActive():
self.turbo_timer.start()
self.refresh_speed_hints()
def refresh_speed_hints(self) -> None:
drawbacks = [
"1) Smart Turbo applies a low-overhead profile and adapts only when needed.",
"2) It uses multicall batching when available to cut RPC round-trips.",
"3) If RPC is unstable, turbo maintenance enters backoff automatically.",
"4) Throughput profile is used only when peers are healthy and speed is already high.",
"5) If port 6800 is busy, the app will automatically choose another RPC port.",
]
self.hints.setPlainText("\n".join(drawbacks))
def closeEvent(self, event) -> None: # type: ignore[override]
self.timer.stop()
self.turbo_timer.stop()
self.turbo_poll_timer.stop()
self.startup_timer.stop()
self.reconnect_timer.stop()
self.executor.shutdown(wait=False, cancel_futures=True)
if self.owns_aria2_process and self.aria2_proc and self.aria2_proc.poll() is None:
self.aria2_proc.terminate()
super().closeEvent(event)
def main() -> None:
app = QtWidgets.QApplication(sys.argv)
try:
window = TorrentWindow()
except Exception as exc:
QtWidgets.QMessageBox.critical(None, "Startup error", str(exc))
sys.exit(1)
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()
Source code is fully readable — audit it, tweak the speed modes, add trackers, change the download folder. It’s yours.
Quick Hits
| Want | Do |
|---|---|
| Run script → Select .torrent → done | |
| Switch to Aggressive mode + hit Apply Turbo | |
| Click Pause → close app → reopen → load same .torrent | |
| It’s ~1100 lines of Python — no hidden magic |
One script. Zero bloat. Your downloads, your rules.



!