-
Notifications
You must be signed in to change notification settings - Fork 243
/
Copy pathmain.py
163 lines (137 loc) · 7.39 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import re
import requests
import logging
from collections import OrderedDict
from datetime import datetime
import config
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[logging.FileHandler("function.log", "w", encoding="utf-8"), logging.StreamHandler()])
def parse_template(template_file):
template_channels = OrderedDict()
current_category = None
with open(template_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
if "#genre#" in line:
current_category = line.split(",")[0].strip()
template_channels[current_category] = []
elif current_category:
channel_name = line.split(",")[0].strip()
template_channels[current_category].append(channel_name)
return template_channels
def fetch_channels(url):
channels = OrderedDict()
try:
response = requests.get(url)
response.raise_for_status()
response.encoding = 'utf-8'
lines = response.text.split("\n")
current_category = None
is_m3u = any("#EXTINF" in line for line in lines[:15])
source_type = "m3u" if is_m3u else "txt"
logging.info(f"url: {url} 获取成功,判断为{source_type}格式")
if is_m3u:
for line in lines:
line = line.strip()
if line.startswith("#EXTINF"):
match = re.search(r'group-title="(.*?)",(.*)', line)
if match:
current_category = match.group(1).strip()
channel_name = match.group(2).strip()
if current_category not in channels:
channels[current_category] = []
elif line and not line.startswith("#"):
channel_url = line.strip()
if current_category and channel_name:
channels[current_category].append((channel_name, channel_url))
else:
for line in lines:
line = line.strip()
if "#genre#" in line:
current_category = line.split(",")[0].strip()
channels[current_category] = []
elif current_category:
match = re.match(r"^(.*?),(.*?)$", line)
if match:
channel_name = match.group(1).strip()
channel_url = match.group(2).strip()
channels[current_category].append((channel_name, channel_url))
elif line:
channels[current_category].append((line, ''))
if channels:
categories = ", ".join(channels.keys())
logging.info(f"url: {url} 爬取成功✅,包含频道分类: {categories}")
except requests.RequestException as e:
logging.error(f"url: {url} 爬取失败❌, Error: {e}")
return channels
def match_channels(template_channels, all_channels):
matched_channels = OrderedDict()
for category, channel_list in template_channels.items():
matched_channels[category] = OrderedDict()
for channel_name in channel_list:
for online_category, online_channel_list in all_channels.items():
for online_channel_name, online_channel_url in online_channel_list:
if channel_name == online_channel_name:
matched_channels[category].setdefault(channel_name, []).append(online_channel_url)
return matched_channels
def filter_source_urls(template_file):
template_channels = parse_template(template_file)
source_urls = config.source_urls
all_channels = OrderedDict()
for url in source_urls:
fetched_channels = fetch_channels(url)
for category, channel_list in fetched_channels.items():
if category in all_channels:
all_channels[category].extend(channel_list)
else:
all_channels[category] = channel_list
matched_channels = match_channels(template_channels, all_channels)
return matched_channels, template_channels
def is_ipv6(url):
return re.match(r'^http:\/\/\[[0-9a-fA-F:]+\]', url) is not None
def updateChannelUrlsM3U(channels, template_channels):
written_urls = set()
current_date = datetime.now().strftime("%Y-%m-%d")
for group in config.announcements:
for announcement in group['entries']:
if announcement['name'] is None:
announcement['name'] = current_date
with open("live.m3u", "w", encoding="utf-8") as f_m3u:
f_m3u.write(f"""#EXTM3U x-tvg-url={",".join(f'"{epg_url}"' for epg_url in config.epg_urls)}\n""")
with open("live.txt", "w", encoding="utf-8") as f_txt:
for group in config.announcements:
f_txt.write(f"{group['channel']},#genre#\n")
for announcement in group['entries']:
f_m3u.write(f"""#EXTINF:-1 tvg-id="1" tvg-name="{announcement['name']}" tvg-logo="{announcement['logo']}" group-title="{group['channel']}",{announcement['name']}\n""")
f_m3u.write(f"{announcement['url']}\n")
f_txt.write(f"{announcement['name']},{announcement['url']}\n")
for category, channel_list in template_channels.items():
f_txt.write(f"{category},#genre#\n")
if category in channels:
for channel_name in channel_list:
if channel_name in channels[category]:
sorted_urls = sorted(channels[category][channel_name], key=lambda url: not is_ipv6(url) if config.ip_version_priority == "ipv6" else is_ipv6(url))
filtered_urls = []
for url in sorted_urls:
if url and url not in written_urls and not any(blacklist in url for blacklist in config.url_blacklist):
filtered_urls.append(url)
written_urls.add(url)
total_urls = len(filtered_urls)
for index, url in enumerate(filtered_urls, start=1):
if is_ipv6(url):
url_suffix = f"$LR•IPV6" if total_urls == 1 else f"$LR•IPV6『线路{index}』"
else:
url_suffix = f"$LR•IPV4" if total_urls == 1 else f"$LR•IPV4『线路{index}』"
if '$' in url:
base_url = url.split('$', 1)[0]
else:
base_url = url
new_url = f"{base_url}{url_suffix}"
f_m3u.write(f"#EXTINF:-1 tvg-id=\"{index}\" tvg-name=\"{channel_name}\" tvg-logo=\"https://gcore.jsdelivr.net/gh/yuanzl77/TVlogo@master/png/{channel_name}.png\" group-title=\"{category}\",{channel_name}\n")
f_m3u.write(new_url + "\n")
f_txt.write(f"{channel_name},{new_url}\n")
f_txt.write("\n")
if __name__ == "__main__":
template_file = "demo.txt"
channels, template_channels = filter_source_urls(template_file)
updateChannelUrlsM3U(channels, template_channels)