diff --git a/jar/yt.jar b/jar/yt.jar new file mode 100755 index 00000000..29a28d68 Binary files /dev/null and b/jar/yt.jar differ diff --git a/py/get_iptv.py b/py/get_iptv.py index 6cd964b3..acf168d3 100755 --- a/py/get_iptv.py +++ b/py/get_iptv.py @@ -14,11 +14,11 @@ from urllib3.util.retry import Retry logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) -# 全局锁,用于文件写入(如果需要多线程写入时) +# 全局锁,用于文件写入 write_lock = threading.Lock() def get_session(): - """创建一个带有重试机制的requests Session""" + session = requests.Session() retry = Retry(connect=3, backoff_factor=0.5) adapter = HTTPAdapter(max_retries=retry) @@ -37,7 +37,6 @@ def load_urls_from_file(file_path): with open(file_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() - # 忽略空行和以 # 开头的注释行 if line and not line.startswith("#"): urls.append(line) logger.info(f"从 {file_path} 加载了 {len(urls)} 个源") @@ -69,55 +68,43 @@ def parse_template(template_file): return template_channels def fetch_channels(url): - """从URL获取频道列表,支持M3U和TXT格式""" + """从URL获取频道列表""" channels = OrderedDict() session = get_session() try: response = session.get(url, timeout=30) response.raise_for_status() - response.encoding = response.apparent_encoding or "utf-8" # 自动检测编码 + response.encoding = response.apparent_encoding or "utf-8" lines = [line.strip() for line in response.text.splitlines() if line.strip()] if not lines: return channels - # 判断是否为 M3U 格式 (检查前几行) is_m3u = any("#EXTINF" in line for line in lines[:10]) if is_m3u: current_category = "默认分类" current_name = "未知频道" - # 预编译正则 re_group = re.compile(r'group-title="([^"]*)"') re_name = re.compile(r',([^,]*)$') for line in lines: if line.startswith("#EXTINF"): - # 提取分类 group_match = re_group.search(line) if group_match: current_category = group_match.group(1).strip() - - # 提取名称 name_match = re_name.search(line) if name_match: current_name = name_match.group(1).strip() - elif not line.startswith("#") and "://" in line: - # 这是一个URL行 if current_category not in channels: channels[current_category] = [] - - # 简单过滤无效名称 if current_name and current_name != "未知频道": channels[current_category].append((current_name, line)) - - # 重置名称,防止下一次使用旧名称 current_name = "未知频道" else: - # TXT 格式处理 (Genre,Name,URL) current_category = None for line in lines: if "#genre#" in line: @@ -133,23 +120,16 @@ def fetch_channels(url): return channels - except requests.exceptions.RequestException as e: - logger.error(f"请求 {url} 失败: {e}") - return OrderedDict() except Exception as e: - logger.error(f"处理 {url} 时发生未知错误: {e}") + logger.error(f"处理 {url} 时出错: {e}") return OrderedDict() def match_channels(template_channels, all_channels): - """ - 匹配频道逻辑优化版 - """ + matched = OrderedDict() unmatched_template = OrderedDict() - # 1. 数据扁平化预处理:将所有源频道放入一个大列表中,避免多层循环 - # 结构: (normalized_name, original_name, url, category) - # 这里的 normalized_name 用于不区分大小写的比对 + # 1. 数据扁平化 flattened_source_channels = [] for cat, chans in all_channels.items(): for name, url in chans: @@ -158,12 +138,12 @@ def match_channels(template_channels, all_channels): 'name': name, 'url': url, 'cat': cat, - 'key': f"{name}_{url}" # 用于去重的唯一键 + 'key': f"{name}_{url}" }) used_channel_keys = set() - # 初始化输出结构 + # 初始化 for cat in template_channels: matched[cat] = OrderedDict() unmatched_template[cat] = [] @@ -171,40 +151,36 @@ def match_channels(template_channels, all_channels): # 2. 匹配逻辑 for category, tmpl_names in template_channels.items(): for tmpl_name in tmpl_names: - # 解析变体: "CCTV1|CCTV-1" -> ["CCTV1", "CCTV-1"] + variants = [n.strip() for n in tmpl_name.split("|") if n.strip()] + primary_name = variants[0] + found_for_this_template = False - # 对每个变体进行匹配 for variant in variants: variant_lower = variant.lower() - # 在扁平化的源列表中搜索 - # 优化点:不再使用正则,而是使用字符串包含 (in) 或 精确匹配 - # 如果需要精确匹配优先,可以分两轮;这里保留原逻辑的"包含即匹配" - for src in flattened_source_channels: - # 检查是否已使用 if src['key'] in used_channel_keys: continue - # 核心匹配逻辑:源频道名称 包含 模板变体 - # 例如:模板 "CCTV-1" 匹配源 "CCTV-1 FHD" + # 匹配成功 if variant_lower in src['norm_name']: - if src['name'] not in matched[category]: - matched[category][src['name']] = [] + # 初始化该频道的列表 + if primary_name not in matched[category]: + matched[category][primary_name] = [] + + # 将源数据添加进去 + matched[category][primary_name].append((src['name'], src['url'])) - matched[category][src['name']].append((src['name'], src['url'])) used_channel_keys.add(src['key']) found_for_this_template = True - - # 注意:原代码逻辑没有 break,允许一个变体匹配多个源频道(多线路) if not found_for_this_template: unmatched_template[category].append(tmpl_name) - # 3. 找出源中完全未被使用的频道 + # 3. 找出源中未使用的频道 unmatched_source = OrderedDict() for src in flattened_source_channels: if src['key'] not in used_channel_keys: @@ -215,14 +191,14 @@ def match_channels(template_channels, all_channels): return matched, unmatched_template, unmatched_source def is_ipv6(url): - """检测是否为 IPv6 地址""" - # 简单的 IPv6 URL 检测: http://[2409:...] return "://[" in url def generate_outputs(channels, template_channels): - """生成 m3u 和 txt 文件""" + written_urls = set() - channel_counter = 1 + + # 确保输出目录存在 + os.makedirs("lib", exist_ok=True) output_m3u_path = "lib/iptv.m3u" output_txt_path = "lib/iptv.txt" @@ -240,9 +216,9 @@ def generate_outputs(channels, template_channels): txt.write(f"\n{category},#genre#\n") - # 遍历该分类下的匹配频道 for channel_key_name, channel_list in channels[category].items(): - # 去重逻辑:同一个频道名下,去除 URL 相同的 + + # 去重逻辑 unique_urls = [] seen_urls = set() @@ -254,86 +230,66 @@ def generate_outputs(channels, template_channels): total_lines = len(unique_urls) for idx, url in enumerate(unique_urls, 1): - # 生成后缀 - base_url = url.split("$")[0] # 清理可能已有的后缀 + base_url = url.split("$")[0] suffix_name = "IPV6" if is_ipv6(url) else "IPV4" - # 构造显示名称 + # 构造显示名称,强制使用模板的主名称 display_name = channel_key_name - # 构造最终 URL 标注 - # 格式:$LR•IPV4•2『线路1』 + # 构造后缀 meta_suffix = f"$LR•{suffix_name}" if total_lines > 1: meta_suffix += f"•{total_lines}『线路{idx}』" final_url = f"{base_url}{meta_suffix}" - # 写入 M3U - m3u.write(f'#EXTINF:-1 tvg-id="{channel_counter}" tvg-name="{channel_key_name}" group-title="{category}",{display_name}\n') + m3u.write(f'#EXTINF:-1 tvg-name="{display_name}" group-title="{category}",{display_name}\n') m3u.write(f"{final_url}\n") - # 写入 TXT txt.write(f"{display_name},{final_url}\n") - channel_counter += 1 - - logger.info(f"输出完成,共处理 {channel_counter - 1} 个有效频道。") + logger.info("输出完成。") except Exception as e: logger.error(f"写入文件失败: {e}") def generate_unmatched_report(unmatched_template, unmatched_source): - """生成未匹配报告""" - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + # 确保配置目录存在 + os.makedirs("py/config", exist_ok=True) report_file = "py/config/iptv_test.txt" - total_template_lost = sum(len(v) for v in unmatched_template.values()) - total_source_lost = sum(len(v) for v in unmatched_source.values()) try: with open(report_file, "w", encoding="utf-8") as f: - f.write(f"# 未匹配频道报告 - {timestamp}\n") - f.write(f"# 模板未匹配数: {total_template_lost}\n") - f.write(f"# 源未利用频道数: {total_source_lost}\n\n") - - f.write("## 1. 模板中存在但源中未找到的频道 (建议从模板删除)\n") + f.write(f"# 未匹配报告 {datetime.now()}\n") + f.write(f"# 模板未匹配数: {total_template_lost}\n\n") + f.write("## 模板中有但源中无\n") for cat, names in unmatched_template.items(): if names: f.write(f"\n{cat},#genre#\n") - # 去重保留顺序 for name in list(OrderedDict.fromkeys(names)): f.write(f"{name},\n") - f.write("\n\n## 2. 源中存在但模板未收录的频道 (建议添加到模板)\n") + f.write("\n\n## 源中有但模板无\n") for cat, chans in unmatched_source.items(): if chans: f.write(f"\n{cat},#genre#\n") - # 只记录名称 unique_names = list(OrderedDict.fromkeys([c[0] for c in chans])) for name in unique_names: f.write(f"{name},\n") - - logger.info(f"未匹配报告已生成: {report_file}") return total_template_lost - except Exception as e: logger.error(f"生成报告失败: {e}") return 0 def remove_unmatched_from_template(template_file, unmatched_template): - """备份并更新模板,移除未匹配项""" backup_file = template_file + ".backup" try: shutil.copy2(template_file, backup_file) - logger.info(f"备份模板至: {backup_file}") - with open(template_file, "r", encoding="utf-8") as f: lines = f.readlines() - + new_lines = [] current_cat = None - - # 构建需删除集合以加快查找: {"央视": {"CCTV-99", ...}} to_remove = {cat: set(names) for cat, names in unmatched_template.items()} for line in lines: @@ -341,35 +297,30 @@ def remove_unmatched_from_template(template_file, unmatched_template): if not stripped or stripped.startswith("#"): new_lines.append(line) continue - if "#genre#" in stripped: current_cat = stripped.split(",")[0].strip() new_lines.append(line) continue - if current_cat: name = stripped.split(",")[0].strip() - # 检查是否在删除列表中 - # 注意:模板中可能是 "CCTV1|CCTV-1",未匹配列表中记录的是整串 if current_cat in to_remove and name in to_remove[current_cat]: - logger.info(f"移除无效频道: [{current_cat}] {name}") continue new_lines.append(line) with open(template_file, "w", encoding="utf-8") as f: f.writelines(new_lines) - - logger.info("模板文件更新完成。") - + logger.info("已移除无效频道") except Exception as e: logger.error(f"更新模板失败: {e}") def main(template_file, tv_urls): - # 1. 解析模板 + if not tv_urls: + logger.error("没有有效的直播源URL,程序退出。") + return + logger.info("开始解析模板...") template = parse_template(template_file) - # 2. 并发获取源数据 logger.info(f"开始从 {len(tv_urls)} 个源获取数据...") all_channels = OrderedDict() @@ -384,29 +335,19 @@ def main(template_file, tv_urls): if cat not in all_channels: all_channels[cat] = [] all_channels[cat].extend(chans) - logger.info(f"源 {url} 获取成功: {sum(len(v) for v in data.values())} 个频道") - else: - logger.warning(f"源 {url} 无数据") + logger.info(f"源 {url} 获取成功") except Exception as e: - logger.error(f"源 {url} 处理异常: {e}") + logger.error(f"源 {url} 异常: {e}") - # 3. 核心匹配 - logger.info("开始匹配频道...") + logger.info("开始匹配...") matched, unmatched_tmpl, unmatched_src = match_channels(template, all_channels) - # 4. 生成结果文件 - logger.info("生成播放列表文件...") generate_outputs(matched, template) - - # 5. 生成报告 lost_count = generate_unmatched_report(unmatched_tmpl, unmatched_src) - # 6. (可选) 自动清洗模板 if lost_count > 0: - logger.info(f"发现 {lost_count} 个模板频道未匹配,准备从模板中移除...") + logger.info(f"清理 {lost_count} 个无效频道...") remove_unmatched_from_template(template_file, unmatched_tmpl) - else: - logger.info("所有模板频道均匹配成功。") if __name__ == "__main__": # 配置区 @@ -415,4 +356,9 @@ if __name__ == "__main__": TV_URLS = load_urls_from_file(URLS_FILE) + # 备用源 + if not TV_URLS: + logger.warning("未从文件中加载到URL,使用空列表") + TV_URLS = [] + main(TEMPLATE_FILE, TV_URLS)