tv20/lib/51爆料.py
2026-01-02 03:32:24 +01:00

392 lines
16 KiB
Python

import json
import re
import sys
import hashlib
from base64 import b64decode, b64encode
from urllib.parse import urlparse
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from pyquery import PyQuery as pq
sys.path.append('..')
from base.spider import Spider as BaseSpider
img_cache = {}
class Spider(BaseSpider):
def init(self, extend=""):
try:
self.proxies = json.loads(extend)
except:
self.proxies = {}
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
}
self.host = self.get_working_host()
self.headers.update({'Origin': self.host, 'Referer': f"{self.host}/"})
print(f"使用站点: {self.host}")
def getName(self):
return "🌈 51爆料|终极完美版"
def isVideoFormat(self, url):
return any(ext in (url or '') for ext in ['.m3u8', '.mp4', '.ts'])
def manualVideoCheck(self):
return False
def destroy(self):
global img_cache
img_cache.clear()
def get_working_host(self):
dynamic_urls = [
'https://across.iofjyyi.com/',
'https://admit.iofjyyi.com/',
'https://www.51baoliao01.com/',
'https://allow.iofjyyi.com/'
]
for url in dynamic_urls:
try:
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=10)
if response.status_code == 200:
return url
except Exception:
continue
return dynamic_urls[0]
def homeContent(self, filter):
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'class': [], 'list': []}
data = self.getpq(response.text)
classes = []
category_selectors = ['.category-list ul li', '.nav-menu li', '.menu li', 'nav ul li']
for selector in category_selectors:
for k in data(selector).items():
link = k('a')
href = (link.attr('href') or '').strip()
name = (link.text() or '').strip()
if not href or href == '#' or not name: continue
classes.append({'type_name': name, 'type_id': href})
if classes: break
if not classes:
classes = [{'type_name': '最新', 'type_id': '/latest/'}, {'type_name': '热门', 'type_id': '/hot/'}]
return {'class': classes, 'list': self.getlist(data('#index article, article'))}
except Exception as e:
return {'class': [], 'list': []}
def homeVideoContent(self):
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'list': []}
data = self.getpq(response.text)
return {'list': self.getlist(data('#index article, article'))}
except Exception as e:
return {'list': []}
def categoryContent(self, tid, pg, filter, extend):
try:
if '@folder' in tid:
v = self.getfod(tid.replace('@folder', ''))
return {'list': v, 'page': 1, 'pagecount': 1, 'limit': 90, 'total': len(v)}
pg = int(pg) if pg else 1
if tid.startswith('http'):
base_url = tid.rstrip('/')
else:
path = tid if tid.startswith('/') else f"/{tid}"
base_url = f"{self.host}{path}".rstrip('/')
if pg == 1:
url = f"{base_url}/"
else:
url = f"{base_url}/{pg}/"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0}
data = self.getpq(response.text)
videos = self.getlist(data('#archive article, #index article, article'), tid)
return {'list': videos, 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999}
except Exception as e:
return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0}
def detailContent(self, ids):
try:
url = ids[0] if ids[0].startswith('http') else f"{self.host}{ids[0]}"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
data = self.getpq(response.text)
plist = []
used_names = set()
if data('.dplayer'):
for c, k in enumerate(data('.dplayer').items(), start=1):
try:
config_attr = k.attr('data-config')
if config_attr:
config = json.loads(config_attr)
video_url = config.get('video', {}).get('url', '')
if video_url:
ep_name = ''
parent = k.parents().eq(0)
for _ in range(4):
if not parent: break
heading = parent.find('h2, h3, h4').eq(0).text().strip()
if heading:
ep_name = heading
break
parent = parent.parents().eq(0)
base_name = ep_name if ep_name else f"视频{c}"
name = base_name
count = 2
while name in used_names:
name = f"{base_name} {count}"
count += 1
used_names.add(name)
plist.append(f"{name}${video_url}")
except: continue
if not plist:
content_area = data('.post-content, article')
for i, link in enumerate(content_area('a').items(), start=1):
link_text = link.text().strip()
link_href = link.attr('href')
if link_href and any(kw in link_text for kw in ['点击观看', '观看', '播放', '视频', '第一弹', '第二弹', '第三弹', '第四弹', '第五弹', '第六弹', '第七弹', '第八弹', '第九弹', '第十弹']):
ep_name = link_text.replace('点击观看:', '').replace('点击观看', '').strip()
if not ep_name: ep_name = f"视频{i}"
if not link_href.startswith('http'):
link_href = f"{self.host}{link_href}" if link_href.startswith('/') else f"{self.host}/{link_href}"
plist.append(f"{ep_name}${link_href}")
play_url = '#'.join(plist) if plist else f"未找到视频源${url}"
vod_content = ''
try:
tags = []
seen_names = set()
seen_ids = set()
tag_links = data('.tags a, .keywords a, .post-tags a')
candidates = []
for k in tag_links.items():
title = k.text().strip()
href = k.attr('href')
if title and href:
candidates.append({'name': title, 'id': href})
candidates.sort(key=lambda x: len(x['name']), reverse=True)
for item in candidates:
name = item['name']
id_ = item['id']
if id_ in seen_ids: continue
is_duplicate = False
for seen in seen_names:
if name in seen:
is_duplicate = True
break
if not is_duplicate:
target = json.dumps({'id': id_, 'name': name})
tags.append(f'[a=cr:{target}/]{name}[/a]')
seen_names.add(name)
seen_ids.add(id_)
if tags:
vod_content = ' '.join(tags)
else:
vod_content = data('.post-title').text()
except Exception:
vod_content = '获取标签失败'
if not vod_content:
vod_content = data('h1').text() or '51爆料'
return {'list': [{'vod_play_from': '51爆料', 'vod_play_url': play_url, 'vod_content': vod_content}]}
except:
return {'list': [{'vod_play_from': '51爆料', 'vod_play_url': '获取失败'}]}
def searchContent(self, key, quick, pg="1"):
try:
pg = int(pg) if pg else 1
if pg == 1:
url = f"{self.host}/search/{key}/"
else:
url = f"{self.host}/search/{key}/{pg}/"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
return {'list': self.getlist(self.getpq(response.text)('article')), 'page': pg, 'pagecount': 9999}
except:
return {'list': [], 'page': pg, 'pagecount': 9999}
def playerContent(self, flag, id, vipFlags):
parse = 0 if self.isVideoFormat(id) else 1
url = self.proxy(id) if '.m3u8' in id else id
return {'parse': parse, 'url': url, 'header': self.headers}
def localProxy(self, param):
try:
type_ = param.get('type')
url = param.get('url')
if type_ == 'cache':
key = param.get('key')
if content := img_cache.get(key):
return [200, 'image/jpeg', content]
return [404, 'text/plain', b'Expired']
elif type_ == 'img':
real_url = self.d64(url) if not url.startswith('http') else url
res = requests.get(real_url, headers=self.headers, proxies=self.proxies, timeout=10)
content = self.aesimg(res.content)
return [200, 'image/jpeg', content]
elif type_ == 'm3u8':
return self.m3Proxy(url)
else:
return self.tsProxy(url)
except:
return [404, 'text/plain', b'']
def proxy(self, data, type='m3u8'):
if data and self.proxies: return f"{self.getProxyUrl()}&url={self.e64(data)}&type={type}"
return data
def m3Proxy(self, url):
url = self.d64(url)
res = requests.get(url, headers=self.headers, proxies=self.proxies)
data = res.text
base = res.url.rsplit('/', 1)[0]
lines = []
for line in data.split('\n'):
if '#EXT' not in line and line.strip():
if not line.startswith('http'):
line = f"{base}/{line}"
lines.append(self.proxy(line, 'ts'))
else:
lines.append(line)
return [200, "application/vnd.apple.mpegurl", '\n'.join(lines)]
def tsProxy(self, url):
return [200, 'video/mp2t', requests.get(self.d64(url), headers=self.headers, proxies=self.proxies).content]
def e64(self, text):
return b64encode(str(text).encode()).decode()
def d64(self, text):
return b64decode(str(text).encode()).decode()
def aesimg(self, data):
if len(data) < 16: return data
keys = [(b'f5d965df75336270', b'97b60394abc2fbe1'), (b'75336270f5d965df', b'abc2fbe197b60394')]
for k, v in keys:
try:
dec = unpad(AES.new(k, AES.MODE_CBC, v).decrypt(data), 16)
if dec.startswith(b'\xff\xd8') or dec.startswith(b'\x89PNG'): return dec
except: pass
try:
dec = unpad(AES.new(k, AES.MODE_ECB).decrypt(data), 16)
if dec.startswith(b'\xff\xd8'): return dec
except: pass
return data
def getlist(self, data, tid=''):
videos = []
is_folder = '/mrdg' in (tid or '')
for k in data.items():
card_html = k.outer_html() if hasattr(k, 'outer_html') else str(k)
a = k if k.is_('a') else k('a').eq(0)
href = a.attr('href')
title = k('h2').text() or k('.entry-title').text() or k('.post-title').text()
if not title and k.is_('a'): title = k.text()
if href and title:
img = self.getimg(k('script').text(), k, card_html)
videos.append({
'vod_id': f"{href}{'@folder' if is_folder else ''}",
'vod_name': title.strip(),
'vod_pic': img,
'vod_remarks': k('time').text() or '',
'vod_tag': 'folder' if is_folder else '',
'style': {"type": "rect", "ratio": 1.33}
})
return videos
def getfod(self, id):
url = f"{self.host}{id}"
data = self.getpq(requests.get(url, headers=self.headers, proxies=self.proxies).text)
videos = []
for i, h2 in enumerate(data('.post-content h2').items()):
p_txt = data('.post-content p').eq(i * 2)
p_img = data('.post-content p').eq(i * 2 + 1)
p_html = p_img.outer_html() if hasattr(p_img, 'outer_html') else str(p_img)
videos.append({
'vod_id': p_txt('a').attr('href'),
'vod_name': p_txt.text().strip(),
'vod_pic': self.getimg('', p_img, p_html),
'vod_remarks': h2.text().strip()
})
return videos
def getimg(self, text, elem=None, html_content=None):
if m := re.search(r"loadBannerDirect\('([^']+)'", text or ''):
return self._proc_url(m.group(1))
if html_content is None and elem is not None:
html_content = elem.outer_html() if hasattr(elem, 'outer_html') else str(elem)
if not html_content: return ''
html_content = html_content.replace('&quot;', '"').replace('&apos;', "'").replace('&amp;', '&')
if 'data:image' in html_content:
m = re.search(r'(data:image/[a-zA-Z0-9+/=;,]+)', html_content)
if m: return self._proc_url(m.group(1))
m = re.search(r'(https?://[^"\'\s)]+\.(?:jpg|png|jpeg|webp))', html_content, re.I)
if m: return self._proc_url(m.group(1))
if 'url(' in html_content:
m = re.search(r'url\s*\(\s*[\'"]?([^"\'\)]+)[\'"]?\s*\)', html_content, re.I)
if m: return self._proc_url(m.group(1))
return ''
def _proc_url(self, url):
if not url: return ''
url = url.strip('\'" ')
if url.startswith('data:'):
try:
_, b64_str = url.split(',', 1)
raw = b64decode(b64_str)
if not (raw.startswith(b'\xff\xd8') or raw.startswith(b'\x89PNG') or raw.startswith(b'GIF8')):
raw = self.aesimg(raw)
key = hashlib.md5(raw).hexdigest()
img_cache[key] = raw
return f"{self.getProxyUrl()}&type=cache&key={key}"
except: return ""
if not url.startswith('http'):
url = f"{self.host}{url}" if url.startswith('/') else f"{self.host}/{url}"
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=img"
def getpq(self, data):
try: return pq(data)
except: return pq(data.encode('utf-8'))