import re import os import requests import logging import shutil import threading from collections import OrderedDict from datetime import datetime from concurrent.futures import ThreadPoolExecutor, as_completed from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # 全局锁,用于文件写入 write_lock = threading.Lock() def get_session(): session = requests.Session() retry = Retry(connect=3, backoff_factor=0.5) adapter = HTTPAdapter(max_retries=retry) session.mount('http://', adapter) session.mount('https://', adapter) return session def load_urls_from_file(file_path): """从文本文件加载URL列表""" urls = [] if not os.path.exists(file_path): logger.warning(f"URL配置文件未找到: {file_path}") return urls try: with open(file_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if line and not line.startswith("#"): urls.append(line) logger.info(f"从 {file_path} 加载了 {len(urls)} 个源") except Exception as e: logger.error(f"读取URL文件失败: {e}") return urls def parse_template(template_file): """解析模板文件""" template_channels = OrderedDict() current_category = None try: with open(template_file, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "#genre#" in line: current_category = line.split(",")[0].strip() template_channels[current_category] = [] elif current_category: channel_name = line.split(",")[0].strip() template_channels[current_category].append(channel_name) except FileNotFoundError: logger.error(f"模板文件未找到: {template_file}") return template_channels def fetch_channels(url): """从URL获取频道列表""" channels = OrderedDict() session = get_session() try: response = session.get(url, timeout=30) response.raise_for_status() response.encoding = response.apparent_encoding or "utf-8" lines = [line.strip() for line in response.text.splitlines() if line.strip()] if not lines: return channels is_m3u = any("#EXTINF" in line for line in lines[:10]) if is_m3u: current_category = "默认分类" current_name = "未知频道" re_group = re.compile(r'group-title="([^"]*)"') re_name = re.compile(r',([^,]*)$') for line in lines: if line.startswith("#EXTINF"): group_match = re_group.search(line) if group_match: current_category = group_match.group(1).strip() name_match = re_name.search(line) if name_match: current_name = name_match.group(1).strip() elif not line.startswith("#") and "://" in line: if current_category not in channels: channels[current_category] = [] if current_name and current_name != "未知频道": channels[current_category].append((current_name, line)) current_name = "未知频道" else: current_category = None for line in lines: if "#genre#" in line: current_category = line.split(",")[0].strip() if current_category not in channels: channels[current_category] = [] elif current_category and "," in line: parts = line.split(",", 1) if len(parts) == 2: name, url = parts if name.strip() and url.strip(): channels[current_category].append((name.strip(), url.strip())) return channels except Exception as e: logger.error(f"处理 {url} 时出错: {e}") return OrderedDict() def match_channels(template_channels, all_channels): matched = OrderedDict() unmatched_template = OrderedDict() # 1. 数据扁平化 flattened_source_channels = [] for cat, chans in all_channels.items(): for name, url in chans: flattened_source_channels.append({ 'norm_name': name.lower(), 'name': name, 'url': url, 'cat': cat, 'key': f"{name}_{url}" }) used_channel_keys = set() # 初始化 for cat in template_channels: matched[cat] = OrderedDict() unmatched_template[cat] = [] # 2. 匹配逻辑 for category, tmpl_names in template_channels.items(): for tmpl_name in tmpl_names: variants_raw = [n.strip() for n in tmpl_name.split("|") if n.strip()] variants = list(OrderedDict.fromkeys(variants_raw)) primary_name = variants[0] found_for_this_template = False for variant in variants: variant_lower = variant.lower() pattern = re.compile(re.escape(variant_lower) + r'($|[^a-z0-9\+])') for src in flattened_source_channels: if src['key'] in used_channel_keys: continue # 使用正则搜索代替简单的字符串包含 if pattern.search(src['norm_name']): # 初始化该频道的列表 if primary_name not in matched[category]: matched[category][primary_name] = [] matched[category][primary_name].append((src['name'], src['url'])) used_channel_keys.add(src['key']) found_for_this_template = True if not found_for_this_template: unmatched_template[category].append(tmpl_name) # 3. 找出源中未使用的频道 unmatched_source = OrderedDict() for src in flattened_source_channels: if src['key'] not in used_channel_keys: if src['cat'] not in unmatched_source: unmatched_source[src['cat']] = [] unmatched_source[src['cat']].append((src['name'], src['url'])) return matched, unmatched_template, unmatched_source def is_ipv6(url): return "://[" in url def generate_outputs(channels, template_channels): """生成文件 - 聚合多线路""" written_urls = set() # 确保输出目录存在 os.makedirs("lib", exist_ok=True) output_m3u_path = "lib/iptv.m3u" output_txt_path = "lib/iptv.txt" try: with write_lock: with open(output_m3u_path, "w", encoding="utf-8") as m3u, \ open(output_txt_path, "w", encoding="utf-8") as txt: m3u.write("#EXTM3U\n") for category in template_channels: if category not in channels or not channels[category]: continue txt.write(f"\n{category},#genre#\n") for channel_key_name, channel_list in channels[category].items(): # 去重逻辑 unique_urls = [] seen_urls = set() for _, url in channel_list: if url not in seen_urls and url not in written_urls: unique_urls.append(url) seen_urls.add(url) written_urls.add(url) total_lines = len(unique_urls) for idx, url in enumerate(unique_urls, 1): base_url = url.split("$")[0] suffix_name = "IPV6" if is_ipv6(url) else "IPV4" # 强制使用模板主名称 display_name = channel_key_name # 构造后缀 meta_suffix = f"$LR•{suffix_name}" if total_lines > 1: meta_suffix += f"•{total_lines}『线路{idx}』" final_url = f"{base_url}{meta_suffix}" m3u.write(f'#EXTINF:-1 tvg-name="{display_name}" group-title="{category}",{display_name}\n') m3u.write(f"{final_url}\n") txt.write(f"{display_name},{final_url}\n") logger.info("输出完成。") except Exception as e: logger.error(f"写入文件失败: {e}") def generate_unmatched_report(unmatched_template, unmatched_source): # 确保配置目录存在 os.makedirs("py/config", exist_ok=True) report_file = "py/config/iptv_test.txt" total_template_lost = sum(len(v) for v in unmatched_template.values()) try: with open(report_file, "w", encoding="utf-8") as f: f.write(f"# 未匹配报告 {datetime.now()}\n") f.write(f"# 模板未匹配数: {total_template_lost}\n\n") f.write("## 模板中有但源中无\n") for cat, names in unmatched_template.items(): if names: f.write(f"\n{cat},#genre#\n") for name in list(OrderedDict.fromkeys(names)): f.write(f"{name},\n") f.write("\n\n## 源中有但模板无\n") for cat, chans in unmatched_source.items(): if chans: f.write(f"\n{cat},#genre#\n") unique_names = list(OrderedDict.fromkeys([c[0] for c in chans])) for name in unique_names: f.write(f"{name},\n") return total_template_lost except Exception as e: logger.error(f"生成报告失败: {e}") return 0 def remove_unmatched_from_template(template_file, unmatched_template): backup_file = template_file + ".backup" try: shutil.copy2(template_file, backup_file) with open(template_file, "r", encoding="utf-8") as f: lines = f.readlines() new_lines = [] current_cat = None to_remove = {cat: set(names) for cat, names in unmatched_template.items()} for line in lines: stripped = line.strip() if not stripped or stripped.startswith("#"): new_lines.append(line) continue if "#genre#" in stripped: current_cat = stripped.split(",")[0].strip() new_lines.append(line) continue if current_cat: name = stripped.split(",")[0].strip() if current_cat in to_remove and name in to_remove[current_cat]: continue new_lines.append(line) with open(template_file, "w", encoding="utf-8") as f: f.writelines(new_lines) logger.info("已移除无效频道") except Exception as e: logger.error(f"更新模板失败: {e}") def main(template_file, tv_urls): if not tv_urls: logger.error("没有有效的直播源URL,程序退出。") return logger.info("开始解析模板...") template = parse_template(template_file) logger.info(f"开始从 {len(tv_urls)} 个源获取数据...") all_channels = OrderedDict() with ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(fetch_channels, url): url for url in tv_urls} for future in as_completed(future_to_url): url = future_to_url[future] try: data = future.result() if data: for cat, chans in data.items(): if cat not in all_channels: all_channels[cat] = [] all_channels[cat].extend(chans) logger.info(f"源 {url} 获取成功") except Exception as e: logger.error(f"源 {url} 异常: {e}") logger.info("开始匹配...") matched, unmatched_tmpl, unmatched_src = match_channels(template, all_channels) generate_outputs(matched, template) lost_count = generate_unmatched_report(unmatched_tmpl, unmatched_src) if lost_count > 0: logger.info(f"清理 {lost_count} 个无效频道...") remove_unmatched_from_template(template_file, unmatched_tmpl) if __name__ == "__main__": # 配置区 TEMPLATE_FILE = "py/config/iptv.txt" URLS_FILE = "py/config/urls.txt" TV_URLS = load_urls_from_file(URLS_FILE) # 备用源 if not TV_URLS: logger.warning("未从文件中加载到URL,使用空列表") TV_URLS = [] main(TEMPLATE_FILE, TV_URLS)