1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
#!/usr/bin/env python
"""Relevant Utilities.
File: utils.py
Author: huxuan
Email: i(at)huxuan.org
"""
import json
import logging
import socket
import struct
from subprocess import (
PIPE,
Popen,
TimeoutExpired,
)
from typing import Any
from urllib.parse import urlparse
import requests
from iptvtools.config import Config
PROBE_COMMAND = (
"ffprobe -hide_banner -show_streams -select_streams v -of json=c=1 -v quiet"
)
UDP_SCHEME = (
"udp",
"rtp",
)
def convert_url_with_udpxy(orig_url: str, udpxy: str) -> str:
"""Convert url with udpxy."""
parsed_url = urlparse(orig_url)
if parsed_url.scheme in UDP_SCHEME:
return f"{udpxy}/{parsed_url.scheme}/{parsed_url.netloc}"
return orig_url
def unify_title_and_id(item: dict[str, Any]) -> dict[str, Any]:
"""Unify title and id."""
for title_unifier in sorted(Config.title_unifiers):
if title_unifier in item["title"]:
item["title"] = item["title"].replace(
title_unifier, Config.title_unifiers[title_unifier]
)
if "tvg-name" in item.get("params", {}):
item["id"] = item["params"]["tvg-name"]
else:
item["id"] = item["title"]
for id_unifier in sorted(Config.id_unifiers):
if id_unifier in item["id"]:
item["id"] = item["id"].replace(id_unifier, Config.id_unifiers[id_unifier])
return item
def probe(url: str, timeout: int | None = None) -> Any:
"""Invoke probe to get stream information."""
outs = None
with Popen( # noqa: S603
f"{PROBE_COMMAND} {url}".split(), stdout=PIPE, stderr=PIPE
) as proc:
try:
outs, _ = proc.communicate(timeout=timeout)
except TimeoutExpired:
proc.kill()
if outs:
try:
return json.loads(outs.decode("utf-8"))
except json.JSONDecodeError as exc:
logging.error(exc)
return None
def check_stream(url: str, timeout: int | None = None) -> int:
"""Check stream information and return height."""
stream_info = probe(url, timeout)
if stream_info and stream_info.get("streams"):
return max([int(stream.get("height", 0)) for stream in stream_info["streams"]])
return 0
def check_connectivity(url: str, timeout: int | None = None) -> bool:
"""Check connectivity."""
parsed_url = urlparse(url)
if parsed_url.scheme in UDP_SCHEME:
return check_udp_connectivity(parsed_url.netloc, timeout)
return check_http_connectivity(url, timeout)
def check_udp_connectivity(url: str, timeout: int | None = None) -> bool:
"""Check UDP connectivity."""
ipaddr, port = url.rsplit(":", 1)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.settimeout(timeout)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
sock.bind(("", int(port)))
mreq = struct.pack("4sl", socket.inet_aton(ipaddr), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
try:
if sock.recv(10240):
return True
except TimeoutError:
pass
return False
def check_http_connectivity(url: str, timeout: int | None = None) -> bool:
"""Check HTTP connectivity."""
try:
return requests.get(url, timeout=timeout, stream=True).ok
except requests.RequestException:
return False
def height_to_resolution(height: int) -> str:
"""Convert height to resolution."""
if not height:
return ""
if height >= 4320:
return "8K"
if height >= 2160:
return "4K"
if height >= 1080:
return "1080p"
if height >= 720:
return "720p"
return f"{height}p"
|