⚡ Torrent Downloader Built From Scratch — No uTorrent Nonsense

:magnet: Clean Torrent Downloader — No Bloat, No Bundled Malware, Just Downloads Update 11.03.2026

A one-script torrent client built from scratch. No uTorrent adware. No qBittorrent config rabbit holes. Just pick a file and go.

Most torrent clients install toolbars, crypto miners, or “recommended software” you never asked for. This one is a single Python script — open source, no installer, no background processes, no surprises.

Think of it as a remote control for aria2 — the fastest download engine that already lives on your machine (or installs in one command). The GUI just makes it human-friendly.


🖥️ What It Looks Like

Waiting for a torrent — clean, simple interface:

Downloading at 9.9 MB/s in Aggressive mode — 82 peers connected:

Three speed modes to match your connection:

⚡ Why This Instead of uTorrent / qBittorrent / BiglyBT
Problem With Others This Tool
Bundled adware / toolbars Zero installs — one .py file, nothing else
Background processes eating RAM Runs only when you open it, dies when you close it
Confusing settings pages 3 speed modes: Safe, Balanced, Aggressive — pick one
Forced updates / premium nags Open source — no accounts, no subscriptions, no nagging
Crypto miners hiding in installers Readable source code — see exactly what runs
🛠️ How to Use It (3 Steps)

Step 1 — Install the engine (one time only)

OS Command
Windows choco install aria2 or download from aria2 GitHub releases
Linux sudo apt install aria2
Mac brew install aria2

Also need PyQt6: pip install PyQt6

Step 2 — Run the script

Save the code below as torrent.py, then:

python torrent.py

Step 3 — Download

Click Select .torrent → pick your .torrent file → choose a speed mode → watch it fly.

:high_voltage: Resume support built in. Close the app mid-download, reopen it, load the same torrent — it picks up where you left off. Delete the status files in your Downloads folder to start fresh.

📋 The Full Source Code
#!/usr/bin/env python3
"""Torrent downloader GUI powered by PyQt6 + aria2 RPC."""

from __future__ import annotations

import base64
import json
import os
import shutil
import socket
import subprocess
import sys
import time
import urllib.error
import urllib.request
import webbrowser
from concurrent.futures import Future, ThreadPoolExecutor
from pathlib import Path
from typing import Dict, Optional

from PyQt6 import QtCore, QtWidgets

FORCE_UNLIMITED_PEER_SPEED = False
DEFAULT_RPC_HOST = "127.0.0.1"
PREFERRED_PORTS = (6800, 6801, 6802, 6803, 6880, 6881, 6969, 8080, 8081, 5000, 8888, 9090, 10000, 16800)
RPC_PROBE_TIMEOUT_S = 0.2
STATUS_TIMEOUT_S = 1.8
RPC_FAST_TIMEOUT_S = 1.2
TURBO_MAINTENANCE_INTERVAL_S = 45.0
TURBO_FAIL_BACKOFF_S = 90.0

MODES: Dict[str, Dict[str, str]] = {
    "🐢 Safe": {
        "max-connection-per-server": "4",
        "split": "4",
        "bt-max-peers": "70",
        "bt-request-peer-speed-limit": "64K",
    },
    "⚖️ Balanced": {
        "max-connection-per-server": "16",
        "split": "16",
        "bt-max-peers": "220",
        "bt-request-peer-speed-limit": "32K",
    },
    "🔥 Aggressive": {
        "max-connection-per-server": "16",
        "split": "32",
        "bt-max-peers": "600",
        "bt-request-peer-speed-limit": "0",
    },
}

TRACKERS = [
    "udp://tracker.opentrackr.org:1337/announce",
    "udp://open.demonii.com:1337/announce",
    "udp://tracker.torrent.eu.org:451/announce",
    "udp://tracker.coppersurfer.tk:6969/announce",
    "udp://tracker.openbittorrent.com:6969/announce",
    "udp://tracker.zer0day.to:1337/announce",
    "udp://tracker.cyberia.is:6969/announce",
    "udp://explodie.org:6969/announce",
    "udp://tracker.moeking.me:6969/announce",
    "udp://9.rarbg.com:2710/announce",
    "udp://tracker.dler.org:6969/announce",
    "udp://tracker1.bt.moack.co.kr:80/announce",
    "udp://tracker.srv00.com:6969/announce",
]


def resolve_download_dir() -> Path:
    default = Path(os.environ.get("USERPROFILE", "")).expanduser() / "Downloads"
    if str(default).strip() in {"", "Downloads"}:
        default = Path.home() / "Downloads"
    default.mkdir(parents=True, exist_ok=True)
    probe = default / ".write_test.tmp"
    probe.write_text("ok", encoding="utf-8")
    probe.unlink(missing_ok=True)
    return default


def rpc_url(port: int) -> str:
    return f"http://{DEFAULT_RPC_HOST}:{port}/jsonrpc"


def aria2_request(
    method: str,
    params: Optional[list] = None,
    rpc_endpoint: str = rpc_url(6800),
    timeout_s: float = 8.0,
) -> dict:
    payload = {
        "jsonrpc": "2.0",
        "id": "req",
        "method": f"aria2.{method}",
        "params": params or [],
    }
    req = urllib.request.Request(
        rpc_endpoint,
        data=json.dumps(payload).encode("utf-8"),
        headers={"Content-Type": "application/json"},
    )
    try:
        with urllib.request.urlopen(req, timeout=timeout_s) as resp:
            data = json.loads(resp.read())
    except urllib.error.HTTPError as exc:
        body = exc.read().decode("utf-8", errors="ignore")
        raise RuntimeError(f"HTTP {exc.code} {exc.reason}\n{body}") from None
    except urllib.error.URLError as exc:
        raise RuntimeError(f"RPC unavailable ({rpc_endpoint}): {exc.reason}") from None

    if "error" in data:
        message = data["error"].get("message", "Unknown error")
        raise RuntimeError(message)
    return data


def aria2_raw_request(
    method: str,
    params: Optional[list] = None,
    rpc_endpoint: str = rpc_url(6800),
    timeout_s: float = 8.0,
) -> dict:
    payload = {
        "jsonrpc": "2.0",
        "id": "req",
        "method": method,
        "params": params or [],
    }
    req = urllib.request.Request(
        rpc_endpoint,
        data=json.dumps(payload).encode("utf-8"),
        headers={"Content-Type": "application/json"},
    )
    try:
        with urllib.request.urlopen(req, timeout=timeout_s) as resp:
            data = json.loads(resp.read())
    except urllib.error.HTTPError as exc:
        body = exc.read().decode("utf-8", errors="ignore")
        raise RuntimeError(f"HTTP {exc.code} {exc.reason}\n{body}") from None
    except urllib.error.URLError as exc:
        raise RuntimeError(f"RPC unavailable ({rpc_endpoint}): {exc.reason}") from None

    if "error" in data:
        message = data["error"].get("message", "Unknown error")
        raise RuntimeError(message)
    return data


def human_bytes(value: str | int) -> str:
    n = float(int(value))
    for unit in ["B", "KB", "MB", "GB", "TB"]:
        if n < 1024:
            return f"{n:.1f} {unit}"
        n /= 1024
    return f"{n:.1f} PB"


def pick_free_port(start: int = 6800, stop: int = 6999) -> int:
    for port in range(start, stop + 1):
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            if sock.connect_ex((DEFAULT_RPC_HOST, port)) != 0:
                return port
    raise RuntimeError("Unable to find a free RPC port (6800-6999).")


class TorrentWindow(QtWidgets.QWidget):
    def __init__(self) -> None:
        super().__init__()
        self.download_dir = resolve_download_dir()
        self.aria2_proc: Optional[subprocess.Popen] = None
        self.owns_aria2_process = False
        self.gid_current: Optional[str] = None
        self.is_paused = False
        self.rpc_port = 6800
        self.rpc_endpoint = rpc_url(self.rpc_port)
        self.executor = ThreadPoolExecutor(max_workers=3)
        self.add_in_flight = False
        self.progress_request_in_flight = False
        self.add_future: Optional[Future] = None
        self.status_future: Optional[Future] = None
        self.status_tick = 0
        self.status_timeout_streak = 0
        self.last_known_state = ""
        self.last_completed_length = 0
        self.last_speed_ts = time.time()
        self.speed_ema = 0.0
        self.no_seed_streak = 0
        self.turbo_sticky_enabled = False
        self.turbo_last_apply_ts = 0.0
        self.turbo_apply_in_flight = False
        self.turbo_apply_future: Optional[Future] = None
        self.turbo_apply_show_popup = False
        self.turbo_fail_streak = 0
        self.turbo_backoff_until = 0.0
        self.turbo_last_profile = ""
        self.turbo_last_maintenance_ts = 0.0
        self.peer_discovery_future: Optional[Future] = None
        self.startup_future: Optional[Future] = None
        self.reconnect_future: Optional[Future] = None
        self.last_status_line = ""
        self.last_speed_line = ""

        self.setWindowTitle("📥 Torrent Downloader (PyQt6)")
        self.setFixedSize(620, 450)

        self.status_label = QtWidgets.QLabel("Waiting for .torrent...")
        self.rpc_label = QtWidgets.QLabel("RPC: checking...")
        self.progress = QtWidgets.QProgressBar()
        self.progress.setRange(0, 100)
        self.speed_label = QtWidgets.QLabel("0 B/s")
        self.file_label = QtWidgets.QLabel("")

        self.mode_box = QtWidgets.QComboBox()
        self.mode_box.addItems(MODES.keys())
        self.mode_box.setCurrentText("⚖️ Balanced")

        self.select_button = QtWidgets.QPushButton("📂 Select .torrent")
        self.pause_button = QtWidgets.QPushButton("▶️ Start")
        self.open_button = QtWidgets.QPushButton("📁 Open folder")
        self.tune_button = QtWidgets.QPushButton("⚡ Apply turbo settings")
        self.reconnect_button = QtWidgets.QPushButton("🔌 Reconnect RPC")

        self.hints = QtWidgets.QPlainTextEdit()
        self.hints.setReadOnly(True)
        self.hints.setMaximumHeight(160)

        layout = QtWidgets.QVBoxLayout(self)
        layout.addWidget(self.status_label)
        layout.addWidget(self.rpc_label)
        layout.addWidget(self.progress)
        layout.addWidget(self.speed_label)
        layout.addWidget(self.file_label)

        row = QtWidgets.QHBoxLayout()
        row.addWidget(QtWidgets.QLabel("⚙️ Speed mode:"))
        row.addWidget(self.mode_box)
        layout.addLayout(row)

        buttons = QtWidgets.QHBoxLayout()
        buttons.addWidget(self.select_button)
        buttons.addWidget(self.pause_button)
        buttons.addWidget(self.open_button)
        layout.addLayout(buttons)

        rpc_buttons = QtWidgets.QHBoxLayout()
        rpc_buttons.addWidget(self.tune_button)
        rpc_buttons.addWidget(self.reconnect_button)
        layout.addLayout(rpc_buttons)

        layout.addWidget(QtWidgets.QLabel(f"📁 Downloading to: {self.download_dir}"))
        layout.addWidget(QtWidgets.QLabel("Speed diagnostics:"))
        layout.addWidget(self.hints)

        self.timer = QtCore.QTimer(self)
        self.timer.setInterval(700)
        self.timer.timeout.connect(self.update_progress)

        self.add_timer = QtCore.QTimer(self)
        self.add_timer.setInterval(60)
        self.add_timer.timeout.connect(self._poll_add_future)

        self.turbo_timer = QtCore.QTimer(self)
        self.turbo_timer.setInterval(20000)
        self.turbo_timer.timeout.connect(self._maintain_turbo)

        self.turbo_poll_timer = QtCore.QTimer(self)
        self.turbo_poll_timer.setInterval(120)
        self.turbo_poll_timer.timeout.connect(self._poll_turbo_future)

        self.startup_timer = QtCore.QTimer(self)
        self.startup_timer.setInterval(80)
        self.startup_timer.timeout.connect(self._poll_startup_future)

        self.reconnect_timer = QtCore.QTimer(self)
        self.reconnect_timer.setInterval(100)
        self.reconnect_timer.timeout.connect(self._poll_reconnect_future)

        self.select_button.clicked.connect(self.choose_torrent)
        self.pause_button.clicked.connect(self.start_pause_resume)
        self.open_button.clicked.connect(lambda: webbrowser.open(str(self.download_dir)))
        self.tune_button.clicked.connect(self.apply_turbo)
        self.reconnect_button.clicked.connect(self.reconnect_rpc)

        self._set_rpc_controls_ready(False)
        self._start_rpc_bootstrap_async()
        self.refresh_speed_hints()

    def _set_rpc_controls_ready(self, ready: bool) -> None:
        self.select_button.setEnabled(ready and not self.add_in_flight)
        self.pause_button.setEnabled(ready and not self.add_in_flight)
        self.tune_button.setEnabled(ready)
        self.reconnect_button.setEnabled(not self.add_in_flight)

    def _start_rpc_bootstrap_async(self) -> None:
        if self.startup_future and not self.startup_future.done():
            return
        self.rpc_label.setText("RPC: connecting asynchronously...")
        self.startup_future = self.executor.submit(self.start_or_connect_aria2, False)
        if not self.startup_timer.isActive():
            self.startup_timer.start()

    def _poll_startup_future(self) -> None:
        if not self.startup_future:
            self.startup_timer.stop()
            return
        if not self.startup_future.done():
            return
        fut = self.startup_future
        self.startup_future = None
        self.startup_timer.stop()
        try:
            fut.result()
            self._set_rpc_controls_ready(True)
            self.rpc_label.setText(f"RPC: connected on port {self.rpc_port}")
            self.status_label.setText("Ready")
        except Exception as exc:
            self._set_rpc_controls_ready(False)
            self.rpc_label.setText("RPC: unavailable")
            self.status_label.setText(f"RPC startup failed: {exc}")

    def rpc_call(self, method: str, params: Optional[list] = None) -> dict:
        return aria2_request(method, params, self.rpc_endpoint)

    def rpc_call_fast(self, method: str, params: Optional[list] = None, timeout_s: float = RPC_FAST_TIMEOUT_S) -> dict:
        return aria2_request(method, params, self.rpc_endpoint, timeout_s=timeout_s)

    def _connect_existing_rpc(self) -> bool:
        for port in PREFERRED_PORTS:
            endpoint = rpc_url(port)
            try:
                aria2_request("getVersion", rpc_endpoint=endpoint, timeout_s=RPC_PROBE_TIMEOUT_S)
                self.rpc_endpoint = endpoint
                self.rpc_port = port
                self.owns_aria2_process = False
                return True
            except Exception:
                continue
        return False

    def _stop_owned_aria2(self) -> None:
        if self.owns_aria2_process and self.aria2_proc and self.aria2_proc.poll() is None:
            self.aria2_proc.terminate()
            self.aria2_proc = None
        self.owns_aria2_process = False

    def _start_local_aria2(self, preferred_ports: Optional[list[int]] = None) -> None:
        if not shutil.which("aria2c"):
            raise RuntimeError(
                "aria2c was not found in PATH. Install aria2 and try again.\n"
                "Windows: choco install aria2\n"
                "Ubuntu/Debian: sudo apt install aria2"
            )

        candidate_ports = list(preferred_ports or PREFERRED_PORTS)
        extra_port = pick_free_port()
        if extra_port not in candidate_ports:
            candidate_ports.append(extra_port)
        last_error = ""

        profiles = [
            self._compatibility_profile_turbo,
            self._compatibility_profile_balanced,
            self._compatibility_profile_minimal,
        ]

        for builder in profiles:
            for port in candidate_ports:
                try:
                    self._start_aria2_on_port(port, builder(port))
                    return
                except Exception as exc:
                    last_error = str(exc)
                    continue

        raise RuntimeError(
            "Failed to start local aria2 RPC on all candidate ports.\n"
            f"Last error: {last_error}"
        )

    def start_or_connect_aria2(
        self,
        prefer_local: bool = True,
        force_restart_local: bool = False,
        preferred_ports: Optional[list[int]] = None,
    ) -> None:
        """Ensure a reliable RPC endpoint.

        By default we prefer a dedicated local aria2 process to avoid unstable external RPC sessions.
        """
        if force_restart_local:
            self._stop_owned_aria2()

        if prefer_local:
            try:
                self._start_local_aria2(preferred_ports=preferred_ports)
                return
            except Exception:
                # Fall back to existing RPC only when local start is impossible.
                pass

        if self._connect_existing_rpc():
            return

        # Final fallback: attempt local again so caller gets detailed startup errors.
        self._start_local_aria2(preferred_ports=preferred_ports)

    def _compatibility_profile_minimal(self, port: int) -> list[str]:
        return [
            "aria2c",
            "--enable-rpc",
            f"--rpc-listen-port={port}",
            "--rpc-listen-all=false",
            "--rpc-allow-origin-all",
            "--rpc-max-request-size=32M",
            f"--dir={self.download_dir}",
            "--file-allocation=none",
        ]

    def _compatibility_profile_balanced(self, port: int) -> list[str]:
        return self._compatibility_profile_minimal(port) + [
            "--rpc-max-request-size=64M",
            "--allow-overwrite=true",
            "--continue=true",
            "--max-concurrent-downloads=5",
            "--bt-enable-lpd=true",
            "--enable-dht=true",
            "--enable-dht6=false",
            "--bt-enable-peer-exchange=true",
            "--seed-time=0",
            "--max-overall-download-limit=0",
            "--max-overall-upload-limit=0",
            f"--bt-tracker={','.join(TRACKERS)}",
        ]

    def _compatibility_profile_turbo(self, port: int) -> list[str]:
        return self._compatibility_profile_balanced(port) + [
            "--bt-detach-seed-only=true",
            "--optimize-concurrent-downloads=true",
        ]

    def _start_aria2_on_port(self, port: int, aria2_args: list[str]) -> None:
        proc = subprocess.Popen(aria2_args, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True)
        endpoint = rpc_url(port)
        for _ in range(8):
            if proc.poll() is not None:
                stderr_text = ""
                if proc.stderr is not None:
                    stderr_text = proc.stderr.read().strip()
                extra = f" | {stderr_text[:200]}" if stderr_text else ""
                raise RuntimeError(
                    f"aria2 exited immediately after start (port {port}, code {proc.returncode}){extra}"
                )
            try:
                aria2_request("getVersion", rpc_endpoint=endpoint, timeout_s=RPC_PROBE_TIMEOUT_S)
                self.aria2_proc = proc
                self.owns_aria2_process = True
                self.rpc_port = port
                self.rpc_endpoint = endpoint
                return
            except Exception:
                time.sleep(0.05)

        proc.terminate()
        raise RuntimeError(f"aria2 RPC is not responding on port {port}")

    def _reconnect_rpc_worker(self) -> tuple[bool, str]:
        try:
            self.start_or_connect_aria2(prefer_local=False, force_restart_local=False)
            if not self.owns_aria2_process:
                self.start_or_connect_aria2(prefer_local=True, force_restart_local=True, preferred_ports=list(PREFERRED_PORTS))
            return True, self.rpc_endpoint
        except Exception as exc:
            return False, str(exc)

    def reconnect_rpc(self) -> None:
        if self.reconnect_future and not self.reconnect_future.done():
            return
        self.reconnect_button.setEnabled(False)
        self.rpc_label.setText("RPC: reconnecting...")
        self.reconnect_future = self.executor.submit(self._reconnect_rpc_worker)
        if not self.reconnect_timer.isActive():
            self.reconnect_timer.start()

    def _poll_reconnect_future(self) -> None:
        if not self.reconnect_future:
            self.reconnect_timer.stop()
            return
        if not self.reconnect_future.done():
            return
        fut = self.reconnect_future
        self.reconnect_future = None
        self.reconnect_timer.stop()
        self.reconnect_button.setEnabled(True)
        ok, payload = fut.result()
        if ok:
            QtWidgets.QMessageBox.information(self, "RPC", f"Connected to {payload}")
            self.rpc_label.setText(f"RPC: connected to {payload}")
            self._set_rpc_controls_ready(True)
        else:
            self.rpc_label.setText("RPC: reconnect failed")
            QtWidgets.QMessageBox.critical(self, "RPC", payload)

    def _looks_like_rpc_connection_drop(self, error_text: str) -> bool:
        lowered = error_text.lower()
        markers = (
            "rpc unavailable",
            "connection reset",
            "connection aborted",
            "connection refused",
            "timed out",
            "10053",
            "10054",
            "10061",
        )
        return any(marker in lowered for marker in markers)

    def _add_torrent_with_fallback(self, torrent_path: Path, b64_payload: str, options: Dict[str, str]) -> str:
        # Reliable strategy: addTorrent only, with forced local RPC restart and port rotation.
        # addUri(file://...) is intentionally NOT used because aria2 may reject local file URIs.
        base_ports = [self.rpc_port] + [p for p in PREFERRED_PORTS if p != self.rpc_port]
        # Fast-first strategy to avoid long add delays.
        attempts = [base_ports[:4], base_ports[4:8] or base_ports[:4], base_ports]

        last_error = ""
        for ports in attempts:
            try:
                return str(self.rpc_call("addTorrent", [b64_payload, [], options])["result"])
            except Exception as first_exc:
                last_error = str(first_exc)
                # If connection dropped OR remote aria2 became unhealthy, restart locally on rotated ports.
                if self._looks_like_rpc_connection_drop(last_error) or "HTTP 5" in last_error:
                    try:
                        self.start_or_connect_aria2(
                            prefer_local=True,
                            force_restart_local=True,
                            preferred_ports=list(ports),
                        )
                        return str(self.rpc_call("addTorrent", [b64_payload, [], options])["result"])
                    except Exception as retry_exc:
                        last_error = str(retry_exc)
                        continue
                else:
                    # Non-transport error, return immediately (e.g. invalid torrent data).
                    raise RuntimeError(last_error) from None

        raise RuntimeError(
            "Unable to add torrent after multiple local RPC restarts and port rotations.\n"
            f"File: {torrent_path}\n"
            f"Last error: {last_error}"
        )

    def build_options(self, mode: str) -> Dict[str, str]:
        options = {
            "dir": str(self.download_dir),
            "allow-overwrite": "true",
            "auto-file-renaming": "false",
            "follow-torrent": "true",
            "enable-dht": "true",
            "bt-enable-lpd": "true",
            "bt-enable-peer-exchange": "true",
            "seed-time": "0",
            "max-upload-limit": "0",
            "max-download-limit": "0",
            "min-split-size": "1M",
            "bt-tracker": ",".join(TRACKERS),
        }
        mode_params = dict(MODES.get(mode, {}))

        if mode == "🔥 Aggressive" and not FORCE_UNLIMITED_PEER_SPEED:
            mode_params.pop("bt-request-peer-speed-limit", None)

        max_conn = min(16, max(1, int(mode_params.get("max-connection-per-server", "16"))))
        split = min(32, max(1, int(mode_params.get("split", "16"))))
        peers = max(50, int(mode_params.get("bt-max-peers", "150")))

        options.update(mode_params)
        options.update(
            {
                "max-connection-per-server": str(max_conn),
                "split": str(split),
                "bt-max-peers": str(peers),
            }
        )
        return options

    def _set_add_busy(self, busy: bool) -> None:
        self.add_in_flight = busy
        self.select_button.setEnabled((not busy) and (self.startup_future is None))
        self.pause_button.setEnabled((not busy) and (self.startup_future is None))
        self.reconnect_button.setEnabled(not busy)
        if busy:
            self.status_label.setText("Adding torrent...")

    def _extract_infohash(self, text: str) -> Optional[str]:
        import re

        match = re.search(r"([0-9a-fA-F]{40})", text)
        if not match:
            return None
        return match.group(1).lower()

    def _find_existing_gid_by_infohash(self, infohash: str) -> Optional[str]:
        targets = [
            ("tellActive", []),
            ("tellWaiting", [0, 20]),
            ("tellStopped", [0, 20]),
        ]
        keys = ["gid", "infoHash", "status"]
        for method, base in targets:
            try:
                res = self.rpc_call(method, base + [keys])["result"]
            except Exception:
                continue
            if isinstance(res, dict):
                res = [res]
            for item in res:
                if str(item.get("infoHash", "")).lower() == infohash:
                    return str(item.get("gid", "")) or None
        return None

    def _handle_duplicate_torrent_error(self, error_message: str) -> bool:
        infohash = self._extract_infohash(error_message)
        if not infohash:
            return False
        gid = self._find_existing_gid_by_infohash(infohash)
        if not gid:
            return False
        self.gid_current = gid
        self.is_paused = False
        self.pause_button.setText("⏸ Pause")
        self.timer.start()
        self.status_label.setText("Torrent already registered, attached to existing task")
        QtWidgets.QMessageBox.information(
            self,
            "Already added",
            "This torrent is already in aria2. Attached to existing download task.",
        )
        return True

    def _add_torrent_worker(self, torrent_path: Path, mode: str) -> tuple[bool, str, str]:
        try:
            raw = torrent_path.read_bytes()
            b64 = base64.b64encode(raw).decode("utf-8")
            options = self.build_options(mode)
            gid = self._add_torrent_with_fallback(torrent_path, b64, options)
            return True, gid, mode
        except Exception as exc:
            return False, str(exc), mode

    def _on_add_torrent_finished(self, future: Future) -> None:
        self._set_add_busy(False)
        ok, payload, mode = future.result()
        if ok:
            self.gid_current = payload
            self.is_paused = False
            self.last_completed_length = 0
            self.last_speed_ts = time.time()
            self.speed_ema = 0.0
            self.no_seed_streak = 0
            self.pause_button.setText("⏸ Pause")
            self.status_label.setText(f"Mode '{mode}' started")
            self.timer.start()
            if self.turbo_sticky_enabled:
                profile, global_opts, task_opts = self._turbo_profile(speed_bps=0, peers=0, seeders=0)
                self._start_turbo_apply(show_popup=False, profile=profile, global_opts=global_opts, task_opts=task_opts)
                if not self.turbo_timer.isActive():
                    self.turbo_timer.start()
            return

        error_text = payload
        if "already registered" in error_text.lower() and self._handle_duplicate_torrent_error(error_text):
            return

        QtWidgets.QMessageBox.critical(
            self,
            "Add error",
            (
                "Failed to add torrent after all automatic recovery attempts.\n\n"
                f"Details: {error_text}"
            ),
        )

    def _poll_add_future(self) -> None:
        if not self.add_future:
            self.add_timer.stop()
            return
        if not self.add_future.done():
            return
        fut = self.add_future
        self.add_future = None
        self.add_timer.stop()
        self._on_add_torrent_finished(fut)

    def _peer_discovery_worker(self, gid: str) -> None:
        try:
            aria2_request(
                "changeOption",
                [
                    gid,
                    {
                        "bt-tracker": ",".join(TRACKERS),
                        "bt-max-peers": "800",
                        "bt-request-peer-speed-limit": "0",
                        "bt-enable-peer-exchange": "true",
                    },
                ],
                rpc_endpoint=self.rpc_endpoint,
                timeout_s=1.5,
            )
        except Exception:
            pass

    def _fetch_status_worker(self, gid: str, include_files: bool = False, include_peer_stats: bool = False) -> tuple[bool, object]:
        keys = [
            "status",
            "completedLength",
            "totalLength",
            "downloadSpeed",
            "numSeeders",
            "connections",
            "errorCode",
            "errorMessage",
        ]
        if include_files:
            keys.append("files")

        for _ in range(2):
            try:
                if include_peer_stats:
                    calls = [
                        {"methodName": "aria2.tellStatus", "params": [gid, keys]},
                        {"methodName": "aria2.getPeers", "params": [gid]},
                    ]
                    multi = aria2_raw_request("system.multicall", [calls], rpc_endpoint=self.rpc_endpoint, timeout_s=STATUS_TIMEOUT_S)["result"]
                    status = multi[0][0] if isinstance(multi, list) and multi and isinstance(multi[0], list) and multi[0] else {}
                    peers = multi[1][0] if isinstance(multi, list) and len(multi) > 1 and isinstance(multi[1], list) and multi[1] else []
                else:
                    status = aria2_request(
                        "tellStatus",
                        [gid, keys],
                        rpc_endpoint=self.rpc_endpoint,
                        timeout_s=STATUS_TIMEOUT_S,
                    )["result"]
                    peers = []

                status["peerCount"] = len(peers) if isinstance(peers, list) else 0
                if isinstance(peers, list):
                    status["peerSeederCount"] = sum(1 for p in peers if str(p.get("seeder", "")).lower() in {"true", "1"})
                else:
                    status["peerSeederCount"] = 0
                return True, status
            except Exception as exc:
                last = str(exc)
                if "timed out" in last.lower():
                    time.sleep(0.05)
                    continue
                return False, last
        return False, "timed out"

    def _on_status_fetched(self, future: Future) -> None:
        ok, payload = future.result()
        if not ok:
            if "timed out" in str(payload).lower():
                self.status_timeout_streak += 1
                if self.status_timeout_streak >= 6:
                    self.status_label.setText("RPC is slow, retrying...")
                return
            self.status_timeout_streak = 0
            self.status_label.setText(f"Request error: {payload}")
            return

        self.status_timeout_streak = 0
        status = payload
        self.last_known_state = str(status.get("status", ""))

        comp = int(status.get("completedLength", "0"))
        total = max(1, int(status.get("totalLength", "1")))
        reported_speed = int(status.get("downloadSpeed", "0"))
        progress = min(100, int((comp / total) * 100))

        now = time.time()
        dt = max(0.001, now - self.last_speed_ts)
        observed_speed = max(0.0, (comp - self.last_completed_length) / dt)
        self.last_completed_length = comp
        self.last_speed_ts = now
        self.speed_ema = observed_speed if self.speed_ema <= 0 else (self.speed_ema * 0.7 + observed_speed * 0.3)
        display_speed = int(self.speed_ema)

        seeders_rpc = int(str(status.get("numSeeders", "0")) or "0")
        seeders_peer = int(str(status.get("peerSeederCount", "0")) or "0")
        seeders = max(seeders_rpc, seeders_peer)
        peers = int(str(status.get("peerCount", status.get("connections", "0"))) or "0")

        self.progress.setValue(progress)
        speed_line = f"{human_bytes(display_speed)}/s (real) | peers: {peers} | seeders: {seeders}"
        if speed_line != self.last_speed_line:
            self.speed_label.setText(speed_line)
            self.last_speed_line = speed_line

        state = status.get("status", "")

        # bounded peer discovery submissions to avoid worker queue flood/freeze
        if self.peer_discovery_future and self.peer_discovery_future.done():
            self.peer_discovery_future = None

        if state == "active" and seeders == 0:
            self.no_seed_streak += 1
            if self.no_seed_streak % 12 == 0 and self.gid_current and self.peer_discovery_future is None:
                self.peer_discovery_future = self.executor.submit(self._peer_discovery_worker, self.gid_current)
        else:
            self.no_seed_streak = 0

        if self.turbo_sticky_enabled and state == "active":
            self._schedule_turbo_maintenance(display_speed, peers, seeders)

        status_line = f"{state} | {progress}% | raw: {human_bytes(reported_speed)}/s"
        if status_line != self.last_status_line:
            self.status_label.setText(status_line)
            self.last_status_line = status_line

        target_interval = 550 if state == "active" and display_speed >= 1024 * 1024 else 900
        if self.timer.interval() != target_interval:
            self.timer.setInterval(target_interval)

        if status.get("files"):
            file_name = Path(status["files"][0].get("path", "")).name
            self.file_label.setText(f"📄 {file_name}")

        if state == "complete":
            self.timer.stop()
            self.turbo_sticky_enabled = False
            self.turbo_timer.stop()
            self.turbo_poll_timer.stop()
            self.gid_current = None
            self.is_paused = False
            self.pause_button.setText("▶️ Start")
            QtWidgets.QMessageBox.information(self, "Done", f"File downloaded to\n{self.download_dir}")
            webbrowser.open(str(self.download_dir))
            return

        if state == "error":
            self.timer.stop()
            self.turbo_sticky_enabled = False
            self.turbo_timer.stop()
            self.turbo_poll_timer.stop()
            error_code = str(status.get("errorCode", ""))
            error_message = str(status.get("errorMessage", "Unknown aria2 error."))
            if error_code == "12" and self._handle_duplicate_torrent_error(error_message):
                return
            self.gid_current = None
            self.is_paused = False
            self.pause_button.setText("▶️ Start")
            QtWidgets.QMessageBox.critical(
                self,
                "Download error",
                (
                    "Download failed.\n\n"
                    f"aria2 code: {error_code or 'n/a'}\n"
                    f"aria2 message: {error_message}\n\n"
                    "Tips:\n"
                    "- Try another speed mode (Safe/Balanced).\n"
                    "- Verify torrent has active seeders.\n"
                    "- Try Reconnect RPC, then start again."
                ),
            )

    def choose_torrent(self) -> bool:
        if self.add_in_flight:
            return False
        path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Select torrent", str(Path.home()), "Torrent (*.torrent)")
        if not path:
            return False

        torrent_path = Path(path)
        mode = self.mode_box.currentText()

        self._set_add_busy(True)
        self.add_future = self.executor.submit(self._add_torrent_worker, torrent_path, mode)
        self.add_timer.start()
        return True

    def update_progress(self) -> None:
        if not self.gid_current:
            return

        # consume completed status request
        if self.status_future and self.status_future.done():
            fut = self.status_future
            self.status_future = None
            self.progress_request_in_flight = False
            self._on_status_fetched(fut)
            return

        # submit next status request when none is in flight
        if not self.status_future and not self.progress_request_in_flight:
            self.progress_request_in_flight = True
            gid = self.gid_current
            include_files = (self.status_tick % 6 == 0)
            include_peer_stats = (self.status_tick % 10 == 0)
            self.status_tick += 1
            self.status_future = self.executor.submit(self._fetch_status_worker, gid, include_files, include_peer_stats)

    def _fetch_gid_state(self) -> Optional[str]:
        if not self.gid_current:
            return None
        try:
            res = self.rpc_call_fast("tellStatus", [self.gid_current, ["status"]])["result"]
            return str(res.get("status", ""))
        except Exception:
            return None

    def start_pause_resume(self) -> None:
        if not self.gid_current:
            self.choose_torrent()
            return

        state = self.last_known_state or self._fetch_gid_state()
        if state in {None, "", "error", "complete", "removed"}:
            self.gid_current = None
            self.is_paused = False
            self.pause_button.setText("▶️ Start")
            QtWidgets.QMessageBox.information(
                self,
                "No active download",
                "Previous task is no longer active. Select a torrent to start a new download.",
            )
            return

        try:
            if state == "paused":
                self.rpc_call("unpause", [self.gid_current])
                self.pause_button.setText("⏸ Pause")
                self.is_paused = False
            elif state == "active":
                self.rpc_call("pause", [self.gid_current])
                self.pause_button.setText("▶️ Resume")
                self.is_paused = True
            elif state == "waiting":
                # waiting often cannot be unpaused/paused yet; don't send invalid command
                self.pause_button.setText("▶️ Resume")
                self.is_paused = True
                QtWidgets.QMessageBox.information(self, "Task state", "Task is waiting in queue. Pause/Resume is not available yet.")
            else:
                self.is_paused = (state == "paused")
                QtWidgets.QMessageBox.information(self, "Task state", f"Current task state: {state}")
        except Exception as exc:
            QtWidgets.QMessageBox.critical(self, "Error", f"Failed to switch state:\n{exc}")

    def _turbo_profile(self, speed_bps: int, peers: int, seeders: int) -> tuple[str, Dict[str, str], Dict[str, str]]:
        global_opts = {
            "max-overall-download-limit": "0",
            "max-overall-upload-limit": "0",
            "max-concurrent-downloads": "6",
            "bt-max-open-files": "256",
            "bt-tracker": ",".join(TRACKERS),
        }

        if peers < 8 or seeders == 0:
            profile = "bootstrap"
            task_opts = {
                "max-connection-per-server": "12",
                "split": "12",
                "bt-max-peers": "320",
                "bt-request-peer-speed-limit": "0",
                "bt-enable-peer-exchange": "true",
                "bt-tracker-connect-timeout": "6",
                "bt-tracker-timeout": "6",
                "min-split-size": "2M",
            }
        elif speed_bps > 8 * 1024 * 1024:
            profile = "throughput"
            task_opts = {
                "max-connection-per-server": "16",
                "split": "24",
                "bt-max-peers": "500",
                "bt-request-peer-speed-limit": "0",
                "min-split-size": "4M",
            }
        else:
            profile = "balanced"
            task_opts = {
                "max-connection-per-server": "14",
                "split": "16",
                "bt-max-peers": "380",
                "bt-request-peer-speed-limit": "0",
                "min-split-size": "2M",
            }

        task_opts.update(
            {
                "max-download-limit": "0",
                "max-upload-limit": "0",
                "bt-tracker": ",".join(TRACKERS),
            }
        )
        return profile, global_opts, task_opts

    def _apply_turbo_worker(self, gid: Optional[str], global_opts: Dict[str, str], task_opts: Dict[str, str]) -> tuple[bool, bool, list[str], str]:
        try:
            calls = [{"methodName": "aria2.changeGlobalOption", "params": [global_opts]}]
            if gid:
                calls.append({"methodName": "aria2.changeOption", "params": [gid, task_opts]})

            try:
                aria2_raw_request("system.multicall", [calls], rpc_endpoint=self.rpc_endpoint, timeout_s=1.8)
            except Exception:
                aria2_request("changeGlobalOption", [global_opts], rpc_endpoint=self.rpc_endpoint, timeout_s=1.8)
                if gid:
                    aria2_request("changeOption", [gid, task_opts], rpc_endpoint=self.rpc_endpoint, timeout_s=1.8)

            return True, bool(gid), [], ""
        except Exception as exc:
            return False, False, [], str(exc)

    def _start_turbo_apply(self, show_popup: bool, profile: str, global_opts: Dict[str, str], task_opts: Dict[str, str]) -> None:
        if self.turbo_apply_in_flight:
            return
        self.turbo_apply_in_flight = True
        self.turbo_apply_show_popup = show_popup
        gid = self.gid_current
        self.turbo_apply_future = self.executor.submit(self._apply_turbo_worker, gid, global_opts, task_opts)
        self.turbo_last_profile = profile
        if not self.turbo_poll_timer.isActive():
            self.turbo_poll_timer.start()

    def _poll_turbo_future(self) -> None:
        if not self.turbo_apply_future:
            self.turbo_apply_in_flight = False
            self.turbo_poll_timer.stop()
            return
        if not self.turbo_apply_future.done():
            return

        fut = self.turbo_apply_future
        self.turbo_apply_future = None
        self.turbo_apply_in_flight = False
        self.turbo_poll_timer.stop()

        ok, task_applied, skipped, err = fut.result()
        if not ok:
            self.turbo_fail_streak += 1
            self.turbo_backoff_until = time.time() + TURBO_FAIL_BACKOFF_S
            if self.turbo_apply_show_popup:
                QtWidgets.QMessageBox.critical(self, "Error", err)
            return

        self.turbo_fail_streak = 0
        self.turbo_backoff_until = 0.0
        self.turbo_last_apply_ts = time.time()
        self.turbo_last_maintenance_ts = self.turbo_last_apply_ts
        if task_applied:
            self.status_label.setText(f"Turbo active ({self.turbo_last_profile})")
        else:
            self.status_label.setText("Turbo active globally (waiting for task)")

        if self.turbo_apply_show_popup:
            QtWidgets.QMessageBox.information(
                self,
                "Turbo",
                "Smart Turbo enabled: lightweight adaptive profile with low RPC overhead.",
            )

    def _schedule_turbo_maintenance(self, speed_bps: int, peers: int, seeders: int) -> None:
        now = time.time()
        if self.turbo_apply_in_flight:
            return
        if now < self.turbo_backoff_until:
            return

        profile, global_opts, task_opts = self._turbo_profile(speed_bps, peers, seeders)
        profile_changed = profile != self.turbo_last_profile
        periodic_due = (now - self.turbo_last_maintenance_ts) >= TURBO_MAINTENANCE_INTERVAL_S

        if profile_changed or periodic_due:
            self._start_turbo_apply(show_popup=False, profile=profile, global_opts=global_opts, task_opts=task_opts)

    def _maintain_turbo(self) -> None:
        if not self.turbo_sticky_enabled:
            return
        if not self.gid_current:
            return
        state = self.last_known_state
        if state in {"complete", "error", "removed"}:
            self.turbo_sticky_enabled = False
            self.turbo_timer.stop()
            return

    def apply_turbo(self) -> None:
        self.turbo_sticky_enabled = True
        profile, global_opts, task_opts = self._turbo_profile(speed_bps=0, peers=0, seeders=0)
        self._start_turbo_apply(show_popup=True, profile=profile, global_opts=global_opts, task_opts=task_opts)
        if not self.turbo_timer.isActive():
            self.turbo_timer.start()
        self.refresh_speed_hints()

    def refresh_speed_hints(self) -> None:
        drawbacks = [
            "1) Smart Turbo applies a low-overhead profile and adapts only when needed.",
            "2) It uses multicall batching when available to cut RPC round-trips.",
            "3) If RPC is unstable, turbo maintenance enters backoff automatically.",
            "4) Throughput profile is used only when peers are healthy and speed is already high.",
            "5) If port 6800 is busy, the app will automatically choose another RPC port.",
        ]
        self.hints.setPlainText("\n".join(drawbacks))

    def closeEvent(self, event) -> None:  # type: ignore[override]
        self.timer.stop()
        self.turbo_timer.stop()
        self.turbo_poll_timer.stop()
        self.startup_timer.stop()
        self.reconnect_timer.stop()
        self.executor.shutdown(wait=False, cancel_futures=True)
        if self.owns_aria2_process and self.aria2_proc and self.aria2_proc.poll() is None:
            self.aria2_proc.terminate()
        super().closeEvent(event)


def main() -> None:
    app = QtWidgets.QApplication(sys.argv)
    try:
        window = TorrentWindow()
    except Exception as exc:
        QtWidgets.QMessageBox.critical(None, "Startup error", str(exc))
        sys.exit(1)
    window.show()
    sys.exit(app.exec())


if __name__ == "__main__":
    main()

Source code is fully readable — audit it, tweak the speed modes, add trackers, change the download folder. It’s yours.


:high_voltage: Quick Hits

Want Do
:magnet: Download a torrent Run script → Select .torrent → done
:rocket: Max speed Switch to Aggressive mode + hit Apply Turbo
:pause_button: Pause & resume Click Pause → close app → reopen → load same .torrent
:magnifying_glass_tilted_left: Read the code It’s ~1100 lines of Python — no hidden magic

One script. Zero bloat. Your downloads, your rules.

8 Likes

Cool new update

Yes, errors have been eliminated and the program has been fully optimized. The test only has turbo mode, and it will be updated. new code added