mirror of
https://github.com/cluntop/tvbox.git
synced 2026-01-12 02:18:34 +01:00
Update Up
This commit is contained in:
parent
b2b95ba508
commit
dab388433d
1 changed files with 48 additions and 46 deletions
|
|
@ -25,7 +25,7 @@ def get_session():
|
|||
session.mount('http://', adapter)
|
||||
session.mount('https://', adapter)
|
||||
return session
|
||||
|
||||
|
||||
def load_urls_from_file(file_path):
|
||||
"""从文本文件加载URL列表"""
|
||||
urls = []
|
||||
|
|
@ -55,7 +55,7 @@ def parse_template(template_file):
|
|||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
|
||||
|
||||
if "#genre#" in line:
|
||||
current_category = line.split(",")[0].strip()
|
||||
template_channels[current_category] = []
|
||||
|
|
@ -71,12 +71,12 @@ def fetch_channels(url):
|
|||
"""从URL获取频道列表"""
|
||||
channels = OrderedDict()
|
||||
session = get_session()
|
||||
|
||||
|
||||
try:
|
||||
response = session.get(url, timeout=30)
|
||||
response.raise_for_status()
|
||||
response.encoding = response.apparent_encoding or "utf-8"
|
||||
|
||||
|
||||
lines = [line.strip() for line in response.text.splitlines() if line.strip()]
|
||||
if not lines:
|
||||
return channels
|
||||
|
|
@ -86,7 +86,7 @@ def fetch_channels(url):
|
|||
if is_m3u:
|
||||
current_category = "默认分类"
|
||||
current_name = "未知频道"
|
||||
|
||||
|
||||
re_group = re.compile(r'group-title="([^"]*)"')
|
||||
re_name = re.compile(r',([^,]*)$')
|
||||
|
||||
|
|
@ -128,7 +128,7 @@ def match_channels(template_channels, all_channels):
|
|||
|
||||
matched = OrderedDict()
|
||||
unmatched_template = OrderedDict()
|
||||
|
||||
|
||||
# 1. 数据扁平化
|
||||
flattened_source_channels = []
|
||||
for cat, chans in all_channels.items():
|
||||
|
|
@ -142,7 +142,7 @@ def match_channels(template_channels, all_channels):
|
|||
})
|
||||
|
||||
used_channel_keys = set()
|
||||
|
||||
|
||||
# 初始化
|
||||
for cat in template_channels:
|
||||
matched[cat] = OrderedDict()
|
||||
|
|
@ -152,31 +152,33 @@ def match_channels(template_channels, all_channels):
|
|||
for category, tmpl_names in template_channels.items():
|
||||
for tmpl_name in tmpl_names:
|
||||
|
||||
variants = [n.strip() for n in tmpl_name.split("|") if n.strip()]
|
||||
|
||||
variants_raw = [n.strip() for n in tmpl_name.split("|") if n.strip()]
|
||||
variants = list(OrderedDict.fromkeys(variants_raw))
|
||||
|
||||
primary_name = variants[0]
|
||||
|
||||
|
||||
found_for_this_template = False
|
||||
|
||||
|
||||
for variant in variants:
|
||||
variant_lower = variant.lower()
|
||||
|
||||
|
||||
pattern = re.compile(re.escape(variant_lower) + r'($|[^a-z0-9\+])')
|
||||
|
||||
for src in flattened_source_channels:
|
||||
if src['key'] in used_channel_keys:
|
||||
continue
|
||||
|
||||
# 匹配成功
|
||||
if variant_lower in src['norm_name']:
|
||||
|
||||
# 使用正则搜索代替简单的字符串包含
|
||||
if pattern.search(src['norm_name']):
|
||||
# 初始化该频道的列表
|
||||
if primary_name not in matched[category]:
|
||||
matched[category][primary_name] = []
|
||||
|
||||
# 将源数据添加进去
|
||||
|
||||
matched[category][primary_name].append((src['name'], src['url']))
|
||||
|
||||
used_channel_keys.add(src['key'])
|
||||
found_for_this_template = True
|
||||
|
||||
|
||||
if not found_for_this_template:
|
||||
unmatched_template[category].append(tmpl_name)
|
||||
|
||||
|
|
@ -194,12 +196,12 @@ def is_ipv6(url):
|
|||
return "://[" in url
|
||||
|
||||
def generate_outputs(channels, template_channels):
|
||||
|
||||
"""生成文件 - 聚合多线路"""
|
||||
written_urls = set()
|
||||
|
||||
|
||||
# 确保输出目录存在
|
||||
os.makedirs("lib", exist_ok=True)
|
||||
|
||||
|
||||
output_m3u_path = "lib/iptv.m3u"
|
||||
output_txt_path = "lib/iptv.txt"
|
||||
|
||||
|
|
@ -207,47 +209,47 @@ def generate_outputs(channels, template_channels):
|
|||
with write_lock:
|
||||
with open(output_m3u_path, "w", encoding="utf-8") as m3u, \
|
||||
open(output_txt_path, "w", encoding="utf-8") as txt:
|
||||
|
||||
|
||||
m3u.write("#EXTM3U\n")
|
||||
|
||||
|
||||
for category in template_channels:
|
||||
if category not in channels or not channels[category]:
|
||||
continue
|
||||
|
||||
|
||||
txt.write(f"\n{category},#genre#\n")
|
||||
|
||||
|
||||
for channel_key_name, channel_list in channels[category].items():
|
||||
|
||||
|
||||
# 去重逻辑
|
||||
unique_urls = []
|
||||
seen_urls = set()
|
||||
|
||||
|
||||
for _, url in channel_list:
|
||||
if url not in seen_urls and url not in written_urls:
|
||||
unique_urls.append(url)
|
||||
seen_urls.add(url)
|
||||
written_urls.add(url)
|
||||
|
||||
|
||||
total_lines = len(unique_urls)
|
||||
for idx, url in enumerate(unique_urls, 1):
|
||||
base_url = url.split("$")[0]
|
||||
suffix_name = "IPV6" if is_ipv6(url) else "IPV4"
|
||||
|
||||
# 构造显示名称,强制使用模板的主名称
|
||||
|
||||
# 强制使用模板主名称
|
||||
display_name = channel_key_name
|
||||
|
||||
|
||||
# 构造后缀
|
||||
meta_suffix = f"$LR•{suffix_name}"
|
||||
if total_lines > 1:
|
||||
meta_suffix += f"•{total_lines}『线路{idx}』"
|
||||
|
||||
|
||||
final_url = f"{base_url}{meta_suffix}"
|
||||
|
||||
|
||||
m3u.write(f'#EXTINF:-1 tvg-name="{display_name}" group-title="{category}",{display_name}\n')
|
||||
m3u.write(f"{final_url}\n")
|
||||
|
||||
|
||||
txt.write(f"{display_name},{final_url}\n")
|
||||
|
||||
|
||||
logger.info("输出完成。")
|
||||
except Exception as e:
|
||||
logger.error(f"写入文件失败: {e}")
|
||||
|
|
@ -257,7 +259,7 @@ def generate_unmatched_report(unmatched_template, unmatched_source):
|
|||
os.makedirs("py/config", exist_ok=True)
|
||||
report_file = "py/config/iptv_test.txt"
|
||||
total_template_lost = sum(len(v) for v in unmatched_template.values())
|
||||
|
||||
|
||||
try:
|
||||
with open(report_file, "w", encoding="utf-8") as f:
|
||||
f.write(f"# 未匹配报告 {datetime.now()}\n")
|
||||
|
|
@ -268,7 +270,7 @@ def generate_unmatched_report(unmatched_template, unmatched_source):
|
|||
f.write(f"\n{cat},#genre#\n")
|
||||
for name in list(OrderedDict.fromkeys(names)):
|
||||
f.write(f"{name},\n")
|
||||
|
||||
|
||||
f.write("\n\n## 源中有但模板无\n")
|
||||
for cat, chans in unmatched_source.items():
|
||||
if chans:
|
||||
|
|
@ -287,7 +289,7 @@ def remove_unmatched_from_template(template_file, unmatched_template):
|
|||
shutil.copy2(template_file, backup_file)
|
||||
with open(template_file, "r", encoding="utf-8") as f:
|
||||
lines = f.readlines()
|
||||
|
||||
|
||||
new_lines = []
|
||||
current_cat = None
|
||||
to_remove = {cat: set(names) for cat, names in unmatched_template.items()}
|
||||
|
|
@ -306,7 +308,7 @@ def remove_unmatched_from_template(template_file, unmatched_template):
|
|||
if current_cat in to_remove and name in to_remove[current_cat]:
|
||||
continue
|
||||
new_lines.append(line)
|
||||
|
||||
|
||||
with open(template_file, "w", encoding="utf-8") as f:
|
||||
f.writelines(new_lines)
|
||||
logger.info("已移除无效频道")
|
||||
|
|
@ -320,10 +322,10 @@ def main(template_file, tv_urls):
|
|||
|
||||
logger.info("开始解析模板...")
|
||||
template = parse_template(template_file)
|
||||
|
||||
|
||||
logger.info(f"开始从 {len(tv_urls)} 个源获取数据...")
|
||||
all_channels = OrderedDict()
|
||||
|
||||
|
||||
with ThreadPoolExecutor(max_workers=5) as executor:
|
||||
future_to_url = {executor.submit(fetch_channels, url): url for url in tv_urls}
|
||||
for future in as_completed(future_to_url):
|
||||
|
|
@ -341,10 +343,10 @@ def main(template_file, tv_urls):
|
|||
|
||||
logger.info("开始匹配...")
|
||||
matched, unmatched_tmpl, unmatched_src = match_channels(template, all_channels)
|
||||
|
||||
|
||||
generate_outputs(matched, template)
|
||||
lost_count = generate_unmatched_report(unmatched_tmpl, unmatched_src)
|
||||
|
||||
|
||||
if lost_count > 0:
|
||||
logger.info(f"清理 {lost_count} 个无效频道...")
|
||||
remove_unmatched_from_template(template_file, unmatched_tmpl)
|
||||
|
|
@ -353,12 +355,12 @@ if __name__ == "__main__":
|
|||
# 配置区
|
||||
TEMPLATE_FILE = "py/config/iptv.txt"
|
||||
URLS_FILE = "py/config/urls.txt"
|
||||
|
||||
|
||||
TV_URLS = load_urls_from_file(URLS_FILE)
|
||||
|
||||
|
||||
# 备用源
|
||||
if not TV_URLS:
|
||||
logger.warning("未从文件中加载到URL,使用空列表")
|
||||
TV_URLS = []
|
||||
|
||||
|
||||
main(TEMPLATE_FILE, TV_URLS)
|
||||
|
|
|
|||
Loading…
Reference in a new issue