iptvtools.utils

src/iptvtools/utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python
"""Relevant Utilities.

File: utils.py
Author: huxuan
Email: i(at)huxuan.org
"""

import json
import logging
import socket
import struct
from subprocess import (
    PIPE,
    Popen,
    TimeoutExpired,
)
from typing import Any
from urllib.parse import urlparse

import requests

from iptvtools.config import Config

PROBE_COMMAND = (
    "ffprobe -hide_banner -show_streams -select_streams v -of json=c=1 -v quiet"
)

UDP_SCHEME = (
    "udp",
    "rtp",
)


def convert_url_with_udpxy(orig_url: str, udpxy: str) -> str:
    """Convert url with udpxy."""
    parsed_url = urlparse(orig_url)
    if parsed_url.scheme in UDP_SCHEME:
        return f"{udpxy}/{parsed_url.scheme}/{parsed_url.netloc}"
    return orig_url


def unify_title_and_id(item: dict[str, Any]) -> dict[str, Any]:
    """Unify title and id."""
    for title_unifier in sorted(Config.title_unifiers):
        if title_unifier in item["title"]:
            item["title"] = item["title"].replace(
                title_unifier, Config.title_unifiers[title_unifier]
            )

    if "tvg-name" in item.get("params", {}):
        item["id"] = item["params"]["tvg-name"]
    else:
        item["id"] = item["title"]

    for id_unifier in sorted(Config.id_unifiers):
        if id_unifier in item["id"]:
            item["id"] = item["id"].replace(id_unifier, Config.id_unifiers[id_unifier])

    return item


def probe(url: str, timeout: int | None = None) -> Any:
    """Invoke probe to get stream information."""
    outs = None
    with Popen(  # noqa: S603
        f"{PROBE_COMMAND} {url}".split(), stdout=PIPE, stderr=PIPE
    ) as proc:
        try:
            outs, _ = proc.communicate(timeout=timeout)
        except TimeoutExpired:
            proc.kill()
    if outs:
        try:
            return json.loads(outs.decode("utf-8"))
        except json.JSONDecodeError as exc:
            logging.error(exc)
    return None


def check_stream(url: str, timeout: int | None = None) -> int:
    """Check stream information and return height."""
    stream_info = probe(url, timeout)
    if stream_info and stream_info.get("streams"):
        return max([int(stream.get("height", 0)) for stream in stream_info["streams"]])
    return 0


def check_connectivity(url: str, timeout: int | None = None) -> bool:
    """Check connectivity."""
    parsed_url = urlparse(url)
    if parsed_url.scheme in UDP_SCHEME:
        return check_udp_connectivity(parsed_url.netloc, timeout)
    return check_http_connectivity(url, timeout)


def check_udp_connectivity(url: str, timeout: int | None = None) -> bool:
    """Check UDP connectivity."""
    ipaddr, port = url.rsplit(":", 1)
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    sock.settimeout(timeout)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
    sock.bind(("", int(port)))
    mreq = struct.pack("4sl", socket.inet_aton(ipaddr), socket.INADDR_ANY)
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
    try:
        if sock.recv(10240):
            return True
    except TimeoutError:
        pass
    return False


def check_http_connectivity(url: str, timeout: int | None = None) -> bool:
    """Check HTTP connectivity."""
    try:
        return requests.get(url, timeout=timeout, stream=True).ok
    except requests.RequestException:
        return False


def height_to_resolution(height: int) -> str:
    """Convert height to resolution."""
    if not height:
        return ""
    if height >= 4320:
        return "8K"
    if height >= 2160:
        return "4K"
    if height >= 1080:
        return "1080p"
    if height >= 720:
        return "720p"
    return f"{height}p"