fun/py/get_iptv.py
2025-12-18 20:11:19 +08:00

364 lines
13 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import os
import requests
import logging
import shutil
import threading
from collections import OrderedDict
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 全局锁,用于文件写入
write_lock = threading.Lock()
def get_session():
session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def load_urls_from_file(file_path):
"""从文本文件加载URL列表"""
urls = []
if not os.path.exists(file_path):
logger.warning(f"URL配置文件未找到: {file_path}")
return urls
try:
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
urls.append(line)
logger.info(f"{file_path} 加载了 {len(urls)} 个源")
except Exception as e:
logger.error(f"读取URL文件失败: {e}")
return urls
def parse_template(template_file):
"""解析模板文件"""
template_channels = OrderedDict()
current_category = None
try:
with open(template_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "#genre#" in line:
current_category = line.split(",")[0].strip()
template_channels[current_category] = []
elif current_category:
channel_name = line.split(",")[0].strip()
template_channels[current_category].append(channel_name)
except FileNotFoundError:
logger.error(f"模板文件未找到: {template_file}")
return template_channels
def fetch_channels(url):
"""从URL获取频道列表"""
channels = OrderedDict()
session = get_session()
try:
response = session.get(url, timeout=30)
response.raise_for_status()
response.encoding = response.apparent_encoding or "utf-8"
lines = [line.strip() for line in response.text.splitlines() if line.strip()]
if not lines:
return channels
is_m3u = any("#EXTINF" in line for line in lines[:10])
if is_m3u:
current_category = "默认分类"
current_name = "未知频道"
re_group = re.compile(r'group-title="([^"]*)"')
re_name = re.compile(r',([^,]*)$')
for line in lines:
if line.startswith("#EXTINF"):
group_match = re_group.search(line)
if group_match:
current_category = group_match.group(1).strip()
name_match = re_name.search(line)
if name_match:
current_name = name_match.group(1).strip()
elif not line.startswith("#") and "://" in line:
if current_category not in channels:
channels[current_category] = []
if current_name and current_name != "未知频道":
channels[current_category].append((current_name, line))
current_name = "未知频道"
else:
current_category = None
for line in lines:
if "#genre#" in line:
current_category = line.split(",")[0].strip()
if current_category not in channels:
channels[current_category] = []
elif current_category and "," in line:
parts = line.split(",", 1)
if len(parts) == 2:
name, url = parts
if name.strip() and url.strip():
channels[current_category].append((name.strip(), url.strip()))
return channels
except Exception as e:
logger.error(f"处理 {url} 时出错: {e}")
return OrderedDict()
def match_channels(template_channels, all_channels):
matched = OrderedDict()
unmatched_template = OrderedDict()
# 1. 数据扁平化
flattened_source_channels = []
for cat, chans in all_channels.items():
for name, url in chans:
flattened_source_channels.append({
'norm_name': name.lower(),
'name': name,
'url': url,
'cat': cat,
'key': f"{name}_{url}"
})
used_channel_keys = set()
# 初始化
for cat in template_channels:
matched[cat] = OrderedDict()
unmatched_template[cat] = []
# 2. 匹配逻辑
for category, tmpl_names in template_channels.items():
for tmpl_name in tmpl_names:
variants = [n.strip() for n in tmpl_name.split("|") if n.strip()]
primary_name = variants[0]
found_for_this_template = False
for variant in variants:
variant_lower = variant.lower()
for src in flattened_source_channels:
if src['key'] in used_channel_keys:
continue
# 匹配成功
if variant_lower in src['norm_name']:
# 初始化该频道的列表
if primary_name not in matched[category]:
matched[category][primary_name] = []
# 将源数据添加进去
matched[category][primary_name].append((src['name'], src['url']))
used_channel_keys.add(src['key'])
found_for_this_template = True
if not found_for_this_template:
unmatched_template[category].append(tmpl_name)
# 3. 找出源中未使用的频道
unmatched_source = OrderedDict()
for src in flattened_source_channels:
if src['key'] not in used_channel_keys:
if src['cat'] not in unmatched_source:
unmatched_source[src['cat']] = []
unmatched_source[src['cat']].append((src['name'], src['url']))
return matched, unmatched_template, unmatched_source
def is_ipv6(url):
return "://[" in url
def generate_outputs(channels, template_channels):
written_urls = set()
# 确保输出目录存在
os.makedirs("lib", exist_ok=True)
output_m3u_path = "lib/iptv.m3u"
output_txt_path = "lib/iptv.txt"
try:
with write_lock:
with open(output_m3u_path, "w", encoding="utf-8") as m3u, \
open(output_txt_path, "w", encoding="utf-8") as txt:
m3u.write("#EXTM3U\n")
for category in template_channels:
if category not in channels or not channels[category]:
continue
txt.write(f"\n{category},#genre#\n")
for channel_key_name, channel_list in channels[category].items():
# 去重逻辑
unique_urls = []
seen_urls = set()
for _, url in channel_list:
if url not in seen_urls and url not in written_urls:
unique_urls.append(url)
seen_urls.add(url)
written_urls.add(url)
total_lines = len(unique_urls)
for idx, url in enumerate(unique_urls, 1):
base_url = url.split("$")[0]
suffix_name = "IPV6" if is_ipv6(url) else "IPV4"
# 构造显示名称,强制使用模板的主名称
display_name = channel_key_name
# 构造后缀
meta_suffix = f"$LR•{suffix_name}"
if total_lines > 1:
meta_suffix += f"{total_lines}『线路{idx}"
final_url = f"{base_url}{meta_suffix}"
m3u.write(f'#EXTINF:-1 tvg-name="{display_name}" group-title="{category}",{display_name}\n')
m3u.write(f"{final_url}\n")
txt.write(f"{display_name},{final_url}\n")
logger.info("输出完成。")
except Exception as e:
logger.error(f"写入文件失败: {e}")
def generate_unmatched_report(unmatched_template, unmatched_source):
# 确保配置目录存在
os.makedirs("py/config", exist_ok=True)
report_file = "py/config/iptv_test.txt"
total_template_lost = sum(len(v) for v in unmatched_template.values())
try:
with open(report_file, "w", encoding="utf-8") as f:
f.write(f"# 未匹配报告 {datetime.now()}\n")
f.write(f"# 模板未匹配数: {total_template_lost}\n\n")
f.write("## 模板中有但源中无\n")
for cat, names in unmatched_template.items():
if names:
f.write(f"\n{cat},#genre#\n")
for name in list(OrderedDict.fromkeys(names)):
f.write(f"{name},\n")
f.write("\n\n## 源中有但模板无\n")
for cat, chans in unmatched_source.items():
if chans:
f.write(f"\n{cat},#genre#\n")
unique_names = list(OrderedDict.fromkeys([c[0] for c in chans]))
for name in unique_names:
f.write(f"{name},\n")
return total_template_lost
except Exception as e:
logger.error(f"生成报告失败: {e}")
return 0
def remove_unmatched_from_template(template_file, unmatched_template):
backup_file = template_file + ".backup"
try:
shutil.copy2(template_file, backup_file)
with open(template_file, "r", encoding="utf-8") as f:
lines = f.readlines()
new_lines = []
current_cat = None
to_remove = {cat: set(names) for cat, names in unmatched_template.items()}
for line in lines:
stripped = line.strip()
if not stripped or stripped.startswith("#"):
new_lines.append(line)
continue
if "#genre#" in stripped:
current_cat = stripped.split(",")[0].strip()
new_lines.append(line)
continue
if current_cat:
name = stripped.split(",")[0].strip()
if current_cat in to_remove and name in to_remove[current_cat]:
continue
new_lines.append(line)
with open(template_file, "w", encoding="utf-8") as f:
f.writelines(new_lines)
logger.info("已移除无效频道")
except Exception as e:
logger.error(f"更新模板失败: {e}")
def main(template_file, tv_urls):
if not tv_urls:
logger.error("没有有效的直播源URL程序退出。")
return
logger.info("开始解析模板...")
template = parse_template(template_file)
logger.info(f"开始从 {len(tv_urls)} 个源获取数据...")
all_channels = OrderedDict()
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(fetch_channels, url): url for url in tv_urls}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
if data:
for cat, chans in data.items():
if cat not in all_channels:
all_channels[cat] = []
all_channels[cat].extend(chans)
logger.info(f"{url} 获取成功")
except Exception as e:
logger.error(f"{url} 异常: {e}")
logger.info("开始匹配...")
matched, unmatched_tmpl, unmatched_src = match_channels(template, all_channels)
generate_outputs(matched, template)
lost_count = generate_unmatched_report(unmatched_tmpl, unmatched_src)
if lost_count > 0:
logger.info(f"清理 {lost_count} 个无效频道...")
remove_unmatched_from_template(template_file, unmatched_tmpl)
if __name__ == "__main__":
# 配置区
TEMPLATE_FILE = "py/config/iptv.txt"
URLS_FILE = "py/config/urls.txt"
TV_URLS = load_urls_from_file(URLS_FILE)
# 备用源
if not TV_URLS:
logger.warning("未从文件中加载到URL使用空列表")
TV_URLS = []
main(TEMPLATE_FILE, TV_URLS)