Update Up 12-18 20:11

This commit is contained in:
cluntop 2025-12-18 20:11:19 +08:00
parent 286a3d3478
commit 6bb4dc6059
2 changed files with 52 additions and 106 deletions

BIN
jar/yt.jar Executable file

Binary file not shown.

View file

@ -14,11 +14,11 @@ from urllib3.util.retry import Retry
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 全局锁,用于文件写入(如果需要多线程写入时)
# 全局锁,用于文件写入
write_lock = threading.Lock()
def get_session():
"""创建一个带有重试机制的requests Session"""
session = requests.Session()
retry = Retry(connect=3, backoff_factor=0.5)
adapter = HTTPAdapter(max_retries=retry)
@ -37,7 +37,6 @@ def load_urls_from_file(file_path):
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
# 忽略空行和以 # 开头的注释行
if line and not line.startswith("#"):
urls.append(line)
logger.info(f"{file_path} 加载了 {len(urls)} 个源")
@ -69,55 +68,43 @@ def parse_template(template_file):
return template_channels
def fetch_channels(url):
"""从URL获取频道列表支持M3U和TXT格式"""
"""从URL获取频道列表"""
channels = OrderedDict()
session = get_session()
try:
response = session.get(url, timeout=30)
response.raise_for_status()
response.encoding = response.apparent_encoding or "utf-8" # 自动检测编码
response.encoding = response.apparent_encoding or "utf-8"
lines = [line.strip() for line in response.text.splitlines() if line.strip()]
if not lines:
return channels
# 判断是否为 M3U 格式 (检查前几行)
is_m3u = any("#EXTINF" in line for line in lines[:10])
if is_m3u:
current_category = "默认分类"
current_name = "未知频道"
# 预编译正则
re_group = re.compile(r'group-title="([^"]*)"')
re_name = re.compile(r',([^,]*)$')
for line in lines:
if line.startswith("#EXTINF"):
# 提取分类
group_match = re_group.search(line)
if group_match:
current_category = group_match.group(1).strip()
# 提取名称
name_match = re_name.search(line)
if name_match:
current_name = name_match.group(1).strip()
elif not line.startswith("#") and "://" in line:
# 这是一个URL行
if current_category not in channels:
channels[current_category] = []
# 简单过滤无效名称
if current_name and current_name != "未知频道":
channels[current_category].append((current_name, line))
# 重置名称,防止下一次使用旧名称
current_name = "未知频道"
else:
# TXT 格式处理 (Genre,Name,URL)
current_category = None
for line in lines:
if "#genre#" in line:
@ -133,23 +120,16 @@ def fetch_channels(url):
return channels
except requests.exceptions.RequestException as e:
logger.error(f"请求 {url} 失败: {e}")
return OrderedDict()
except Exception as e:
logger.error(f"处理 {url}发生未知错误: {e}")
logger.error(f"处理 {url} 时出错: {e}")
return OrderedDict()
def match_channels(template_channels, all_channels):
"""
匹配频道逻辑优化版
"""
matched = OrderedDict()
unmatched_template = OrderedDict()
# 1. 数据扁平化预处理:将所有源频道放入一个大列表中,避免多层循环
# 结构: (normalized_name, original_name, url, category)
# 这里的 normalized_name 用于不区分大小写的比对
# 1. 数据扁平化
flattened_source_channels = []
for cat, chans in all_channels.items():
for name, url in chans:
@ -158,12 +138,12 @@ def match_channels(template_channels, all_channels):
'name': name,
'url': url,
'cat': cat,
'key': f"{name}_{url}" # 用于去重的唯一键
'key': f"{name}_{url}"
})
used_channel_keys = set()
# 初始化输出结构
# 初始化
for cat in template_channels:
matched[cat] = OrderedDict()
unmatched_template[cat] = []
@ -171,40 +151,36 @@ def match_channels(template_channels, all_channels):
# 2. 匹配逻辑
for category, tmpl_names in template_channels.items():
for tmpl_name in tmpl_names:
# 解析变体: "CCTV1|CCTV-1" -> ["CCTV1", "CCTV-1"]
variants = [n.strip() for n in tmpl_name.split("|") if n.strip()]
primary_name = variants[0]
found_for_this_template = False
# 对每个变体进行匹配
for variant in variants:
variant_lower = variant.lower()
# 在扁平化的源列表中搜索
# 优化点:不再使用正则,而是使用字符串包含 (in) 或 精确匹配
# 如果需要精确匹配优先,可以分两轮;这里保留原逻辑的"包含即匹配"
for src in flattened_source_channels:
# 检查是否已使用
if src['key'] in used_channel_keys:
continue
# 核心匹配逻辑:源频道名称 包含 模板变体
# 例如:模板 "CCTV-1" 匹配源 "CCTV-1 FHD"
# 匹配成功
if variant_lower in src['norm_name']:
if src['name'] not in matched[category]:
matched[category][src['name']] = []
# 初始化该频道的列表
if primary_name not in matched[category]:
matched[category][primary_name] = []
# 将源数据添加进去
matched[category][primary_name].append((src['name'], src['url']))
matched[category][src['name']].append((src['name'], src['url']))
used_channel_keys.add(src['key'])
found_for_this_template = True
# 注意:原代码逻辑没有 break允许一个变体匹配多个源频道多线路
if not found_for_this_template:
unmatched_template[category].append(tmpl_name)
# 3. 找出源中完全未被使用的频道
# 3. 找出源中使用的频道
unmatched_source = OrderedDict()
for src in flattened_source_channels:
if src['key'] not in used_channel_keys:
@ -215,14 +191,14 @@ def match_channels(template_channels, all_channels):
return matched, unmatched_template, unmatched_source
def is_ipv6(url):
"""检测是否为 IPv6 地址"""
# 简单的 IPv6 URL 检测: http://[2409:...]
return "://[" in url
def generate_outputs(channels, template_channels):
"""生成 m3u 和 txt 文件"""
written_urls = set()
channel_counter = 1
# 确保输出目录存在
os.makedirs("lib", exist_ok=True)
output_m3u_path = "lib/iptv.m3u"
output_txt_path = "lib/iptv.txt"
@ -240,9 +216,9 @@ def generate_outputs(channels, template_channels):
txt.write(f"\n{category},#genre#\n")
# 遍历该分类下的匹配频道
for channel_key_name, channel_list in channels[category].items():
# 去重逻辑:同一个频道名下,去除 URL 相同的
# 去重逻辑
unique_urls = []
seen_urls = set()
@ -254,86 +230,66 @@ def generate_outputs(channels, template_channels):
total_lines = len(unique_urls)
for idx, url in enumerate(unique_urls, 1):
# 生成后缀
base_url = url.split("$")[0] # 清理可能已有的后缀
base_url = url.split("$")[0]
suffix_name = "IPV6" if is_ipv6(url) else "IPV4"
# 构造显示名称
# 构造显示名称,强制使用模板的主名称
display_name = channel_key_name
# 构造最终 URL 标注
# 格式:$LR•IPV4•2『线路1』
# 构造后缀
meta_suffix = f"$LR•{suffix_name}"
if total_lines > 1:
meta_suffix += f"{total_lines}『线路{idx}"
final_url = f"{base_url}{meta_suffix}"
# 写入 M3U
m3u.write(f'#EXTINF:-1 tvg-id="{channel_counter}" tvg-name="{channel_key_name}" group-title="{category}",{display_name}\n')
m3u.write(f'#EXTINF:-1 tvg-name="{display_name}" group-title="{category}",{display_name}\n')
m3u.write(f"{final_url}\n")
# 写入 TXT
txt.write(f"{display_name},{final_url}\n")
channel_counter += 1
logger.info(f"输出完成,共处理 {channel_counter - 1} 个有效频道。")
logger.info("输出完成。")
except Exception as e:
logger.error(f"写入文件失败: {e}")
def generate_unmatched_report(unmatched_template, unmatched_source):
"""生成未匹配报告"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 确保配置目录存在
os.makedirs("py/config", exist_ok=True)
report_file = "py/config/iptv_test.txt"
total_template_lost = sum(len(v) for v in unmatched_template.values())
total_source_lost = sum(len(v) for v in unmatched_source.values())
try:
with open(report_file, "w", encoding="utf-8") as f:
f.write(f"# 未匹配频道报告 - {timestamp}\n")
f.write(f"# 模板未匹配数: {total_template_lost}\n")
f.write(f"# 源未利用频道数: {total_source_lost}\n\n")
f.write("## 1. 模板中存在但源中未找到的频道 (建议从模板删除)\n")
f.write(f"# 未匹配报告 {datetime.now()}\n")
f.write(f"# 模板未匹配数: {total_template_lost}\n\n")
f.write("## 模板中有但源中无\n")
for cat, names in unmatched_template.items():
if names:
f.write(f"\n{cat},#genre#\n")
# 去重保留顺序
for name in list(OrderedDict.fromkeys(names)):
f.write(f"{name},\n")
f.write("\n\n## 2. 源中存在但模板未收录的频道 (建议添加到模板)\n")
f.write("\n\n## 源中有但模板无\n")
for cat, chans in unmatched_source.items():
if chans:
f.write(f"\n{cat},#genre#\n")
# 只记录名称
unique_names = list(OrderedDict.fromkeys([c[0] for c in chans]))
for name in unique_names:
f.write(f"{name},\n")
logger.info(f"未匹配报告已生成: {report_file}")
return total_template_lost
except Exception as e:
logger.error(f"生成报告失败: {e}")
return 0
def remove_unmatched_from_template(template_file, unmatched_template):
"""备份并更新模板,移除未匹配项"""
backup_file = template_file + ".backup"
try:
shutil.copy2(template_file, backup_file)
logger.info(f"备份模板至: {backup_file}")
with open(template_file, "r", encoding="utf-8") as f:
lines = f.readlines()
new_lines = []
current_cat = None
# 构建需删除集合以加快查找: {"央视": {"CCTV-99", ...}}
to_remove = {cat: set(names) for cat, names in unmatched_template.items()}
for line in lines:
@ -341,35 +297,30 @@ def remove_unmatched_from_template(template_file, unmatched_template):
if not stripped or stripped.startswith("#"):
new_lines.append(line)
continue
if "#genre#" in stripped:
current_cat = stripped.split(",")[0].strip()
new_lines.append(line)
continue
if current_cat:
name = stripped.split(",")[0].strip()
# 检查是否在删除列表中
# 注意:模板中可能是 "CCTV1|CCTV-1",未匹配列表中记录的是整串
if current_cat in to_remove and name in to_remove[current_cat]:
logger.info(f"移除无效频道: [{current_cat}] {name}")
continue
new_lines.append(line)
with open(template_file, "w", encoding="utf-8") as f:
f.writelines(new_lines)
logger.info("模板文件更新完成。")
logger.info("已移除无效频道")
except Exception as e:
logger.error(f"更新模板失败: {e}")
def main(template_file, tv_urls):
# 1. 解析模板
if not tv_urls:
logger.error("没有有效的直播源URL程序退出。")
return
logger.info("开始解析模板...")
template = parse_template(template_file)
# 2. 并发获取源数据
logger.info(f"开始从 {len(tv_urls)} 个源获取数据...")
all_channels = OrderedDict()
@ -384,29 +335,19 @@ def main(template_file, tv_urls):
if cat not in all_channels:
all_channels[cat] = []
all_channels[cat].extend(chans)
logger.info(f"{url} 获取成功: {sum(len(v) for v in data.values())} 个频道")
else:
logger.warning(f"{url} 无数据")
logger.info(f"{url} 获取成功")
except Exception as e:
logger.error(f"{url} 处理异常: {e}")
logger.error(f"{url} 异常: {e}")
# 3. 核心匹配
logger.info("开始匹配频道...")
logger.info("开始匹配...")
matched, unmatched_tmpl, unmatched_src = match_channels(template, all_channels)
# 4. 生成结果文件
logger.info("生成播放列表文件...")
generate_outputs(matched, template)
# 5. 生成报告
lost_count = generate_unmatched_report(unmatched_tmpl, unmatched_src)
# 6. (可选) 自动清洗模板
if lost_count > 0:
logger.info(f"发现 {lost_count} 个模板频道未匹配,准备从模板中移除...")
logger.info(f"清理 {lost_count} 个无效频道...")
remove_unmatched_from_template(template_file, unmatched_tmpl)
else:
logger.info("所有模板频道均匹配成功。")
if __name__ == "__main__":
# 配置区
@ -415,4 +356,9 @@ if __name__ == "__main__":
TV_URLS = load_urls_from_file(URLS_FILE)
# 备用源
if not TV_URLS:
logger.warning("未从文件中加载到URL使用空列表")
TV_URLS = []
main(TEMPLATE_FILE, TV_URLS)