上传文件至 lib

This commit is contained in:
dong 2026-01-02 03:32:42 +01:00
parent 7c6c76b80b
commit 7fd3cfead8
5 changed files with 1798 additions and 0 deletions

389
lib/818黑料网.py Normal file
View file

@ -0,0 +1,389 @@
import json
import re
import sys
import hashlib
from base64 import b64decode, b64encode
from urllib.parse import urlparse
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from pyquery import PyQuery as pq
sys.path.append('..')
from base.spider import Spider as BaseSpider
img_cache = {}
class Spider(BaseSpider):
def init(self, extend=""):
try:
self.proxies = json.loads(extend)
except:
self.proxies = {}
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
}
self.host = self.get_working_host()
self.headers.update({'Origin': self.host, 'Referer': f"{self.host}/"})
print(f"使用站点: {self.host}")
def getName(self):
return "🌈 818黑料网|终极完美版"
def isVideoFormat(self, url):
return any(ext in (url or '') for ext in ['.m3u8', '.mp4', '.ts'])
def manualVideoCheck(self):
return False
def destroy(self):
global img_cache
img_cache.clear()
def get_working_host(self):
dynamic_urls = [
'https://cell.lacdfsq.cc/'
]
for url in dynamic_urls:
try:
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=10)
if response.status_code == 200:
return url
except Exception:
continue
return dynamic_urls[0]
def homeContent(self, filter):
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'class': [], 'list': []}
data = self.getpq(response.text)
classes = []
category_selectors = ['.category-list ul li', '.nav-menu li', '.menu li', 'nav ul li']
for selector in category_selectors:
for k in data(selector).items():
link = k('a')
href = (link.attr('href') or '').strip()
name = (link.text() or '').strip()
if not href or href == '#' or not name: continue
classes.append({'type_name': name, 'type_id': href})
if classes: break
if not classes:
classes = [{'type_name': '最新', 'type_id': '/latest/'}, {'type_name': '热门', 'type_id': '/hot/'}]
return {'class': classes, 'list': self.getlist(data('#index article, article'))}
except Exception as e:
return {'class': [], 'list': []}
def homeVideoContent(self):
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'list': []}
data = self.getpq(response.text)
return {'list': self.getlist(data('#index article, article'))}
except Exception as e:
return {'list': []}
def categoryContent(self, tid, pg, filter, extend):
try:
if '@folder' in tid:
v = self.getfod(tid.replace('@folder', ''))
return {'list': v, 'page': 1, 'pagecount': 1, 'limit': 90, 'total': len(v)}
pg = int(pg) if pg else 1
if tid.startswith('http'):
base_url = tid.rstrip('/')
else:
path = tid if tid.startswith('/') else f"/{tid}"
base_url = f"{self.host}{path}".rstrip('/')
if pg == 1:
url = f"{base_url}/"
else:
url = f"{base_url}/{pg}/"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0}
data = self.getpq(response.text)
videos = self.getlist(data('#archive article, #index article, article'), tid)
return {'list': videos, 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999}
except Exception as e:
return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0}
def detailContent(self, ids):
try:
url = ids[0] if ids[0].startswith('http') else f"{self.host}{ids[0]}"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
data = self.getpq(response.text)
plist = []
used_names = set()
if data('.dplayer'):
for c, k in enumerate(data('.dplayer').items(), start=1):
try:
config_attr = k.attr('data-config')
if config_attr:
config = json.loads(config_attr)
video_url = config.get('video', {}).get('url', '')
if video_url:
ep_name = ''
parent = k.parents().eq(0)
for _ in range(4):
if not parent: break
heading = parent.find('h2, h3, h4').eq(0).text().strip()
if heading:
ep_name = heading
break
parent = parent.parents().eq(0)
base_name = ep_name if ep_name else f"视频{c}"
name = base_name
count = 2
while name in used_names:
name = f"{base_name} {count}"
count += 1
used_names.add(name)
plist.append(f"{name}${video_url}")
except: continue
if not plist:
content_area = data('.post-content, article')
for i, link in enumerate(content_area('a').items(), start=1):
link_text = link.text().strip()
link_href = link.attr('href')
if link_href and any(kw in link_text for kw in ['点击观看', '观看', '播放', '视频', '第一弹', '第二弹', '第三弹', '第四弹', '第五弹', '第六弹', '第七弹', '第八弹', '第九弹', '第十弹']):
ep_name = link_text.replace('点击观看:', '').replace('点击观看', '').strip()
if not ep_name: ep_name = f"视频{i}"
if not link_href.startswith('http'):
link_href = f"{self.host}{link_href}" if link_href.startswith('/') else f"{self.host}/{link_href}"
plist.append(f"{ep_name}${link_href}")
play_url = '#'.join(plist) if plist else f"未找到视频源${url}"
vod_content = ''
try:
tags = []
seen_names = set()
seen_ids = set()
tag_links = data('.tags a, .keywords a, .post-tags a')
candidates = []
for k in tag_links.items():
title = k.text().strip()
href = k.attr('href')
if title and href:
candidates.append({'name': title, 'id': href})
candidates.sort(key=lambda x: len(x['name']), reverse=True)
for item in candidates:
name = item['name']
id_ = item['id']
if id_ in seen_ids: continue
is_duplicate = False
for seen in seen_names:
if name in seen:
is_duplicate = True
break
if not is_duplicate:
target = json.dumps({'id': id_, 'name': name})
tags.append(f'[a=cr:{target}/]{name}[/a]')
seen_names.add(name)
seen_ids.add(id_)
if tags:
vod_content = ' '.join(tags)
else:
vod_content = data('.post-title').text()
except Exception:
vod_content = '获取标签失败'
if not vod_content:
vod_content = data('h1').text() or '818黑料网'
return {'list': [{'vod_play_from': '818黑料网', 'vod_play_url': play_url, 'vod_content': vod_content}]}
except:
return {'list': [{'vod_play_from': '818黑料网', 'vod_play_url': '获取失败'}]}
def searchContent(self, key, quick, pg="1"):
try:
pg = int(pg) if pg else 1
if pg == 1:
url = f"{self.host}/search/{key}/"
else:
url = f"{self.host}/search/{key}/{pg}/"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
return {'list': self.getlist(self.getpq(response.text)('article')), 'page': pg, 'pagecount': 9999}
except:
return {'list': [], 'page': pg, 'pagecount': 9999}
def playerContent(self, flag, id, vipFlags):
parse = 0 if self.isVideoFormat(id) else 1
url = self.proxy(id) if '.m3u8' in id else id
return {'parse': parse, 'url': url, 'header': self.headers}
def localProxy(self, param):
try:
type_ = param.get('type')
url = param.get('url')
if type_ == 'cache':
key = param.get('key')
if content := img_cache.get(key):
return [200, 'image/jpeg', content]
return [404, 'text/plain', b'Expired']
elif type_ == 'img':
real_url = self.d64(url) if not url.startswith('http') else url
res = requests.get(real_url, headers=self.headers, proxies=self.proxies, timeout=10)
content = self.aesimg(res.content)
return [200, 'image/jpeg', content]
elif type_ == 'm3u8':
return self.m3Proxy(url)
else:
return self.tsProxy(url)
except:
return [404, 'text/plain', b'']
def proxy(self, data, type='m3u8'):
if data and self.proxies: return f"{self.getProxyUrl()}&url={self.e64(data)}&type={type}"
return data
def m3Proxy(self, url):
url = self.d64(url)
res = requests.get(url, headers=self.headers, proxies=self.proxies)
data = res.text
base = res.url.rsplit('/', 1)[0]
lines = []
for line in data.split('\n'):
if '#EXT' not in line and line.strip():
if not line.startswith('http'):
line = f"{base}/{line}"
lines.append(self.proxy(line, 'ts'))
else:
lines.append(line)
return [200, "application/vnd.apple.mpegurl", '\n'.join(lines)]
def tsProxy(self, url):
return [200, 'video/mp2t', requests.get(self.d64(url), headers=self.headers, proxies=self.proxies).content]
def e64(self, text):
return b64encode(str(text).encode()).decode()
def d64(self, text):
return b64decode(str(text).encode()).decode()
def aesimg(self, data):
if len(data) < 16: return data
keys = [(b'f5d965df75336270', b'97b60394abc2fbe1'), (b'75336270f5d965df', b'abc2fbe197b60394')]
for k, v in keys:
try:
dec = unpad(AES.new(k, AES.MODE_CBC, v).decrypt(data), 16)
if dec.startswith(b'\xff\xd8') or dec.startswith(b'\x89PNG'): return dec
except: pass
try:
dec = unpad(AES.new(k, AES.MODE_ECB).decrypt(data), 16)
if dec.startswith(b'\xff\xd8'): return dec
except: pass
return data
def getlist(self, data, tid=''):
videos = []
is_folder = '/mrdg' in (tid or '')
for k in data.items():
card_html = k.outer_html() if hasattr(k, 'outer_html') else str(k)
a = k if k.is_('a') else k('a').eq(0)
href = a.attr('href')
title = k('h2').text() or k('.entry-title').text() or k('.post-title').text()
if not title and k.is_('a'): title = k.text()
if href and title:
img = self.getimg(k('script').text(), k, card_html)
videos.append({
'vod_id': f"{href}{'@folder' if is_folder else ''}",
'vod_name': title.strip(),
'vod_pic': img,
'vod_remarks': k('time').text() or '',
'vod_tag': 'folder' if is_folder else '',
'style': {"type": "rect", "ratio": 1.33}
})
return videos
def getfod(self, id):
url = f"{self.host}{id}"
data = self.getpq(requests.get(url, headers=self.headers, proxies=self.proxies).text)
videos = []
for i, h2 in enumerate(data('.post-content h2').items()):
p_txt = data('.post-content p').eq(i * 2)
p_img = data('.post-content p').eq(i * 2 + 1)
p_html = p_img.outer_html() if hasattr(p_img, 'outer_html') else str(p_img)
videos.append({
'vod_id': p_txt('a').attr('href'),
'vod_name': p_txt.text().strip(),
'vod_pic': self.getimg('', p_img, p_html),
'vod_remarks': h2.text().strip()
})
return videos
def getimg(self, text, elem=None, html_content=None):
if m := re.search(r"loadBannerDirect\('([^']+)'", text or ''):
return self._proc_url(m.group(1))
if html_content is None and elem is not None:
html_content = elem.outer_html() if hasattr(elem, 'outer_html') else str(elem)
if not html_content: return ''
html_content = html_content.replace('&quot;', '"').replace('&apos;', "'").replace('&amp;', '&')
if 'data:image' in html_content:
m = re.search(r'(data:image/[a-zA-Z0-9+/=;,]+)', html_content)
if m: return self._proc_url(m.group(1))
m = re.search(r'(https?://[^"\'\s)]+\.(?:jpg|png|jpeg|webp))', html_content, re.I)
if m: return self._proc_url(m.group(1))
if 'url(' in html_content:
m = re.search(r'url\s*\(\s*[\'"]?([^"\'\)]+)[\'"]?\s*\)', html_content, re.I)
if m: return self._proc_url(m.group(1))
return ''
def _proc_url(self, url):
if not url: return ''
url = url.strip('\'" ')
if url.startswith('data:'):
try:
_, b64_str = url.split(',', 1)
raw = b64decode(b64_str)
if not (raw.startswith(b'\xff\xd8') or raw.startswith(b'\x89PNG') or raw.startswith(b'GIF8')):
raw = self.aesimg(raw)
key = hashlib.md5(raw).hexdigest()
img_cache[key] = raw
return f"{self.getProxyUrl()}&type=cache&key={key}"
except: return ""
if not url.startswith('http'):
url = f"{self.host}{url}" if url.startswith('/') else f"{self.host}/{url}"
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=img"
def getpq(self, data):
try: return pq(data)
except: return pq(data.encode('utf-8'))

390
lib/911.py Normal file
View file

@ -0,0 +1,390 @@
import json
import re
import sys
import hashlib
from base64 import b64decode, b64encode
from urllib.parse import urlparse
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from pyquery import PyQuery as pq
sys.path.append('..')
from base.spider import Spider as BaseSpider
img_cache = {}
class Spider(BaseSpider):
def init(self, extend=""):
try:
self.proxies = json.loads(extend)
except:
self.proxies = {}
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
}
self.host = self.get_working_host()
self.headers.update({'Origin': self.host, 'Referer': f"{self.host}/"})
print(f"使用站点: {self.host}")
def getName(self):
return "🌈 911爆料网|终极完美版"
def isVideoFormat(self, url):
return any(ext in (url or '') for ext in ['.m3u8', '.mp4', '.ts'])
def manualVideoCheck(self):
return False
def destroy(self):
global img_cache
img_cache.clear()
def get_working_host(self):
dynamic_urls = [
'https://army.jiiccrt.xyz/',
'https://911blw.com/'
]
for url in dynamic_urls:
try:
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=10)
if response.status_code == 200:
return url
except Exception:
continue
return dynamic_urls[0]
def homeContent(self, filter):
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'class': [], 'list': []}
data = self.getpq(response.text)
classes = []
category_selectors = ['.category-list ul li', '.nav-menu li', '.menu li', 'nav ul li']
for selector in category_selectors:
for k in data(selector).items():
link = k('a')
href = (link.attr('href') or '').strip()
name = (link.text() or '').strip()
if not href or href == '#' or not name: continue
classes.append({'type_name': name, 'type_id': href})
if classes: break
if not classes:
classes = [{'type_name': '最新', 'type_id': '/latest/'}, {'type_name': '热门', 'type_id': '/hot/'}]
return {'class': classes, 'list': self.getlist(data('#index article, article'))}
except Exception as e:
return {'class': [], 'list': []}
def homeVideoContent(self):
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'list': []}
data = self.getpq(response.text)
return {'list': self.getlist(data('#index article, article'))}
except Exception as e:
return {'list': []}
def categoryContent(self, tid, pg, filter, extend):
try:
if '@folder' in tid:
v = self.getfod(tid.replace('@folder', ''))
return {'list': v, 'page': 1, 'pagecount': 1, 'limit': 90, 'total': len(v)}
pg = int(pg) if pg else 1
if tid.startswith('http'):
base_url = tid.rstrip('/')
else:
path = tid if tid.startswith('/') else f"/{tid}"
base_url = f"{self.host}{path}".rstrip('/')
if pg == 1:
url = f"{base_url}/"
else:
url = f"{base_url}/{pg}/"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0}
data = self.getpq(response.text)
videos = self.getlist(data('#archive article, #index article, article'), tid)
return {'list': videos, 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999}
except Exception as e:
return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0}
def detailContent(self, ids):
try:
url = ids[0] if ids[0].startswith('http') else f"{self.host}{ids[0]}"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
data = self.getpq(response.text)
plist = []
used_names = set()
if data('.dplayer'):
for c, k in enumerate(data('.dplayer').items(), start=1):
try:
config_attr = k.attr('data-config')
if config_attr:
config = json.loads(config_attr)
video_url = config.get('video', {}).get('url', '')
if video_url:
ep_name = ''
parent = k.parents().eq(0)
for _ in range(4):
if not parent: break
heading = parent.find('h2, h3, h4').eq(0).text().strip()
if heading:
ep_name = heading
break
parent = parent.parents().eq(0)
base_name = ep_name if ep_name else f"视频{c}"
name = base_name
count = 2
while name in used_names:
name = f"{base_name} {count}"
count += 1
used_names.add(name)
plist.append(f"{name}${video_url}")
except: continue
if not plist:
content_area = data('.post-content, article')
for i, link in enumerate(content_area('a').items(), start=1):
link_text = link.text().strip()
link_href = link.attr('href')
if link_href and any(kw in link_text for kw in ['点击观看', '观看', '播放', '视频', '第一弹', '第二弹', '第三弹', '第四弹', '第五弹', '第六弹', '第七弹', '第八弹', '第九弹', '第十弹']):
ep_name = link_text.replace('点击观看:', '').replace('点击观看', '').strip()
if not ep_name: ep_name = f"视频{i}"
if not link_href.startswith('http'):
link_href = f"{self.host}{link_href}" if link_href.startswith('/') else f"{self.host}/{link_href}"
plist.append(f"{ep_name}${link_href}")
play_url = '#'.join(plist) if plist else f"未找到视频源${url}"
vod_content = ''
try:
tags = []
seen_names = set()
seen_ids = set()
tag_links = data('.tags a, .keywords a, .post-tags a')
candidates = []
for k in tag_links.items():
title = k.text().strip()
href = k.attr('href')
if title and href:
candidates.append({'name': title, 'id': href})
candidates.sort(key=lambda x: len(x['name']), reverse=True)
for item in candidates:
name = item['name']
id_ = item['id']
if id_ in seen_ids: continue
is_duplicate = False
for seen in seen_names:
if name in seen:
is_duplicate = True
break
if not is_duplicate:
target = json.dumps({'id': id_, 'name': name})
tags.append(f'[a=cr:{target}/]{name}[/a]')
seen_names.add(name)
seen_ids.add(id_)
if tags:
vod_content = ' '.join(tags)
else:
vod_content = data('.post-title').text()
except Exception:
vod_content = '获取标签失败'
if not vod_content:
vod_content = data('h1').text() or '911爆料网'
return {'list': [{'vod_play_from': '9911爆料网', 'vod_play_url': play_url, 'vod_content': vod_content}]}
except:
return {'list': [{'vod_play_from': '911爆料网', 'vod_play_url': '获取失败'}]}
def searchContent(self, key, quick, pg="1"):
try:
pg = int(pg) if pg else 1
if pg == 1:
url = f"{self.host}/search/{key}/"
else:
url = f"{self.host}/search/{key}/{pg}/"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
return {'list': self.getlist(self.getpq(response.text)('article')), 'page': pg, 'pagecount': 9999}
except:
return {'list': [], 'page': pg, 'pagecount': 9999}
def playerContent(self, flag, id, vipFlags):
parse = 0 if self.isVideoFormat(id) else 1
url = self.proxy(id) if '.m3u8' in id else id
return {'parse': parse, 'url': url, 'header': self.headers}
def localProxy(self, param):
try:
type_ = param.get('type')
url = param.get('url')
if type_ == 'cache':
key = param.get('key')
if content := img_cache.get(key):
return [200, 'image/jpeg', content]
return [404, 'text/plain', b'Expired']
elif type_ == 'img':
real_url = self.d64(url) if not url.startswith('http') else url
res = requests.get(real_url, headers=self.headers, proxies=self.proxies, timeout=10)
content = self.aesimg(res.content)
return [200, 'image/jpeg', content]
elif type_ == 'm3u8':
return self.m3Proxy(url)
else:
return self.tsProxy(url)
except:
return [404, 'text/plain', b'']
def proxy(self, data, type='m3u8'):
if data and self.proxies: return f"{self.getProxyUrl()}&url={self.e64(data)}&type={type}"
return data
def m3Proxy(self, url):
url = self.d64(url)
res = requests.get(url, headers=self.headers, proxies=self.proxies)
data = res.text
base = res.url.rsplit('/', 1)[0]
lines = []
for line in data.split('\n'):
if '#EXT' not in line and line.strip():
if not line.startswith('http'):
line = f"{base}/{line}"
lines.append(self.proxy(line, 'ts'))
else:
lines.append(line)
return [200, "application/vnd.apple.mpegurl", '\n'.join(lines)]
def tsProxy(self, url):
return [200, 'video/mp2t', requests.get(self.d64(url), headers=self.headers, proxies=self.proxies).content]
def e64(self, text):
return b64encode(str(text).encode()).decode()
def d64(self, text):
return b64decode(str(text).encode()).decode()
def aesimg(self, data):
if len(data) < 16: return data
keys = [(b'f5d965df75336270', b'97b60394abc2fbe1'), (b'75336270f5d965df', b'abc2fbe197b60394')]
for k, v in keys:
try:
dec = unpad(AES.new(k, AES.MODE_CBC, v).decrypt(data), 16)
if dec.startswith(b'\xff\xd8') or dec.startswith(b'\x89PNG'): return dec
except: pass
try:
dec = unpad(AES.new(k, AES.MODE_ECB).decrypt(data), 16)
if dec.startswith(b'\xff\xd8'): return dec
except: pass
return data
def getlist(self, data, tid=''):
videos = []
is_folder = '/mrdg' in (tid or '')
for k in data.items():
card_html = k.outer_html() if hasattr(k, 'outer_html') else str(k)
a = k if k.is_('a') else k('a').eq(0)
href = a.attr('href')
title = k('h2').text() or k('.entry-title').text() or k('.post-title').text()
if not title and k.is_('a'): title = k.text()
if href and title:
img = self.getimg(k('script').text(), k, card_html)
videos.append({
'vod_id': f"{href}{'@folder' if is_folder else ''}",
'vod_name': title.strip(),
'vod_pic': img,
'vod_remarks': k('time').text() or '',
'vod_tag': 'folder' if is_folder else '',
'style': {"type": "rect", "ratio": 1.33}
})
return videos
def getfod(self, id):
url = f"{self.host}{id}"
data = self.getpq(requests.get(url, headers=self.headers, proxies=self.proxies).text)
videos = []
for i, h2 in enumerate(data('.post-content h2').items()):
p_txt = data('.post-content p').eq(i * 2)
p_img = data('.post-content p').eq(i * 2 + 1)
p_html = p_img.outer_html() if hasattr(p_img, 'outer_html') else str(p_img)
videos.append({
'vod_id': p_txt('a').attr('href'),
'vod_name': p_txt.text().strip(),
'vod_pic': self.getimg('', p_img, p_html),
'vod_remarks': h2.text().strip()
})
return videos
def getimg(self, text, elem=None, html_content=None):
if m := re.search(r"loadBannerDirect\('([^']+)'", text or ''):
return self._proc_url(m.group(1))
if html_content is None and elem is not None:
html_content = elem.outer_html() if hasattr(elem, 'outer_html') else str(elem)
if not html_content: return ''
html_content = html_content.replace('&quot;', '"').replace('&apos;', "'").replace('&amp;', '&')
if 'data:image' in html_content:
m = re.search(r'(data:image/[a-zA-Z0-9+/=;,]+)', html_content)
if m: return self._proc_url(m.group(1))
m = re.search(r'(https?://[^"\'\s)]+\.(?:jpg|png|jpeg|webp))', html_content, re.I)
if m: return self._proc_url(m.group(1))
if 'url(' in html_content:
m = re.search(r'url\s*\(\s*[\'"]?([^"\'\)]+)[\'"]?\s*\)', html_content, re.I)
if m: return self._proc_url(m.group(1))
return ''
def _proc_url(self, url):
if not url: return ''
url = url.strip('\'" ')
if url.startswith('data:'):
try:
_, b64_str = url.split(',', 1)
raw = b64decode(b64_str)
if not (raw.startswith(b'\xff\xd8') or raw.startswith(b'\x89PNG') or raw.startswith(b'GIF8')):
raw = self.aesimg(raw)
key = hashlib.md5(raw).hexdigest()
img_cache[key] = raw
return f"{self.getProxyUrl()}&type=cache&key={key}"
except: return ""
if not url.startswith('http'):
url = f"{self.host}{url}" if url.startswith('/') else f"{self.host}/{url}"
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=img"
def getpq(self, data):
try: return pq(data)
except: return pq(data.encode('utf-8'))

236
lib/91吃瓜.py Normal file
View file

@ -0,0 +1,236 @@
# coding=utf-8
# !/python
import sys
import requests
from bs4 import BeautifulSoup
import re
from urllib.parse import urljoin
from base.spider import Spider
import time
sys.path.append('..')
# 全局配置
xurl = "https://barely.vmwzzqom.cc/"
backup_urls = ["https://hlj.fun", "https://911bl16.com"]
headerx = {
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Mobile/15E148 Safari/604.1",
"Referer": "https://911blw.com",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8"
}
IMAGE_FILTER = ["/usr/themes/ads-close.png", "close", "icon", "logo"]
class Spider(Spider):
def getName(self):
return "911爆料网"
def init(self, extend):
pass
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def fetch_page(self, url, use_backup=False):
global xurl
original_url = url
if use_backup:
for backup in backup_urls:
test_url = url.replace(xurl, backup)
try:
time.sleep(1)
res = requests.get(test_url, headers=headerx, timeout=10)
res.raise_for_status()
res.encoding = "utf-8"
text = res.text
if len(text) > 1000:
print(f"[DEBUG] 使用备用 {backup}: {test_url}")
return text
except:
continue
print(f"[ERROR] 所有备用失败,回退原 URL")
try:
time.sleep(1)
res = requests.get(original_url, headers=headerx, timeout=10)
res.raise_for_status()
res.encoding = "utf-8"
text = res.text
doc = BeautifulSoup(text, "html.parser")
title = doc.title.string if doc.title else "无标题"
print(f"[DEBUG] 页面 {original_url}: 长度={len(text)}, 标题={title}")
if len(text) < 1000:
print(f"[DEBUG] 内容过短,尝试备用域名")
return self.fetch_page(original_url, use_backup=True)
return text
except Exception as e:
print(f"[ERROR] 请求失败 {original_url}: {e}")
return None
def extract_content(self, html, url):
videos = []
if not html:
return videos
doc = BeautifulSoup(html, "html.parser")
containers = doc.select("ul.row li, div.article-item, article, .post-item, div[class*='item']")
print(f"[DEBUG] 找到 {len(containers)} 个容器")
for i, vod in enumerate(containers[:20], 1):
try:
# 标题
title_elem = vod.select_one("h2.headline, .headline, a[title]")
name = title_elem.get("title") or title_elem.get_text(strip=True) if title_elem else ""
if not name:
name_match = re.search(r'headline">(.+?)<', str(vod))
name = name_match.group(1).strip() if name_match else ""
# 链接
link_elem = vod.select_one("a")
id = urljoin(xurl, link_elem["href"]) if link_elem else ""
# 备注
remarks_elem = vod.select_one("span.small, time, .date")
remarks = remarks_elem.get_text(strip=True) if remarks_elem else ""
if not remarks:
remarks_match = re.search(r'datePublished[^>]*>(.+?)<', str(vod))
remarks = remarks_match.group(1).strip() if remarks_match else ""
# 图片 - 扩展属性
img = vod.select_one("img")
pic = None
if img:
# 检查多种图片属性
for attr in ["data-lazy-src", "data-original", "data-src", "src"]:
pic = img.get(attr)
if pic:
break
# 检查背景图片
if not pic:
bg_div = vod.select_one("div[style*='background-image']")
if bg_div and "background-image" in bg_div.get("style", ""):
bg_match = re.search(r'url\([\'"]?(.+?)[\'"]?\)', bg_div["style"])
pic = bg_match.group(1) if bg_match else None
if pic:
pic = urljoin(xurl, pic)
alt = img.get("alt", "").lower() if img else ""
if any(f in pic.lower() or f in alt for f in IMAGE_FILTER):
pic = None
print(f"[DEBUG] 项 {i} 图片: {pic}, 属性={img.attrs if img else '无img'}")
# 简介
desc_match = re.search(r'og:description" content="(.+?)"', html)
description = desc_match.group(1) if desc_match else ""
if name and id:
video = {
"vod_id": id,
"vod_name": name[:100],
"vod_pic": pic,
"vod_remarks": remarks,
"vod_content": description
}
videos.append(video)
print(f"[DEBUG] 项 {i}: 标题={name[:50]}..., 链接={id}, 图片={pic}")
except Exception as e:
print(f"[DEBUG] 项 {i} 错误: {e}")
continue
print(f"[DEBUG] 提取 {len(videos)} 个项")
return videos
def homeVideoContent(self):
url = f"{xurl}/category/jrgb/1/"
html = self.fetch_page(url)
videos = self.extract_content(html, url)
return {'list': videos}
def homeContent(self, filter):
result = {'class': []}
categories = [
{"type_id": "/category/jrgb/", "type_name": "最新爆料"},
{"type_id": "/category/rmgb/", "type_name": "精选大瓜"},
{"type_id": "/category/blqw/", "type_name": "猎奇吃瓜"},
{"type_id": "/category/rlph/", "type_name": "TOP5大瓜"},
{"type_id": "/category/ssdbl/", "type_name": "社会热点"},
{"type_id": "/category/hjsq/", "type_name": "海角社区"},
{"type_id": "/category/mrds/", "type_name": "每日大赛"},
{"type_id": "/category/xyss/", "type_name": "校园吃瓜"},
{"type_id": "/category/mxhl/", "type_name": "明星吃瓜"},
{"type_id": "/category/whbl/", "type_name": "网红爆料"},
{"type_id": "/category/bgzq/", "type_name": "反差爆料"},
{"type_id": "/category/fljq/", "type_name": "网黄福利"},
{"type_id": "/category/crfys/", "type_name": "午夜剧场"},
{"type_id": "/category/thjx/", "type_name": "探花经典"},
{"type_id": "/category/dmhv/", "type_name": "禁漫天堂"},
{"type_id": "/category/slec/", "type_name": "吃瓜精选"},
{"type_id": "/category/zksr/", "type_name": "重口调教"},
{"type_id": "/category/crlz/", "type_name": "精选连载"}
]
result['class'] = categories
return result
def categoryContent(self, cid, pg, filter, ext):
url = f"{xurl}{cid}{pg}/" if pg != "1" else f"{xurl}{cid}"
html = self.fetch_page(url)
videos = self.extract_content(html, url)
return {
'list': videos,
'page': pg,
'pagecount': 9999,
'limit': 90,
'total': 999999
}
def detailContent(self, ids):
videos = []
did = ids[0]
html = self.fetch_page(did)
if html:
source_match = re.search(r'"url":"(.*?)"', html)
purl = source_match.group(1).replace("\\", "") if source_match else ""
videos.append({
"vod_id": did,
"vod_play_from": "爆料",
"vod_play_url": purl,
"vod_content": re.search(r'og:description" content="(.+?)"', html).group(1) if re.search(r'og:description" content="(.+?)"', html) else ""
})
return {'list': videos}
def playerContent(self, flag, id, vipFlags):
return {"parse": 0, "playUrl": "", "url": id, "header": headerx}
def searchContent(self, key, quick):
return self.searchContentPage(key, quick, "1")
def searchContentPage(self, key, quick, page):
url = f"{xurl}/search/{key}/{page}/"
html = self.fetch_page(url)
videos = self.extract_content(html, url)
return {'list': videos, 'page': page, 'pagecount': 9999, 'limit': 90, 'total': 999999}
def localProxy(self, params):
if params['type'] == "m3u8":
return self.proxyM3u8(params)
elif params['type'] == "media":
return self.proxyMedia(params)
elif params['type'] == "ts":
return self.proxyTs(params)
return None
if __name__ == "__main__":
spider = Spider()
# 测试首页推荐
result = spider.homeVideoContent()
print(f"测试首页推荐: {len(result['list'])} 个项")
for item in result['list'][:3]:
print(item)
# 测试分类
for cate in ["jrgb", "rmgb", "blqw"]:
result = spider.categoryContent(f"/category/{cate}/", "1", False, {})
print(f"测试分类 {cate}: {len(result['list'])} 个项")
for item in result['list'][:2]:
print(item)

392
lib/91吃瓜中心.py Normal file
View file

@ -0,0 +1,392 @@
import json
import re
import sys
import hashlib
from base64 import b64decode, b64encode
from urllib.parse import urlparse
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from pyquery import PyQuery as pq
sys.path.append('..')
from base.spider import Spider as BaseSpider
img_cache = {}
class Spider(BaseSpider):
def init(self, extend=""):
try:
self.proxies = json.loads(extend)
except:
self.proxies = {}
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
}
self.host = self.get_working_host()
self.headers.update({'Origin': self.host, 'Referer': f"{self.host}/"})
print(f"使用站点: {self.host}")
def getName(self):
return "🌈 91吃瓜中心|终极完美版"
def isVideoFormat(self, url):
return any(ext in (url or '') for ext in ['.m3u8', '.mp4', '.ts'])
def manualVideoCheck(self):
return False
def destroy(self):
global img_cache
img_cache.clear()
def get_working_host(self):
dynamic_urls = [
'https://but.ybejhul.com/',
'https://adopt.ybejhul.com',
'https://amount.jmpcxulm.com/',
'https://www.91cg1.com/'
]
for url in dynamic_urls:
try:
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=10)
if response.status_code == 200:
return url
except Exception:
continue
return dynamic_urls[0]
def homeContent(self, filter):
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'class': [], 'list': []}
data = self.getpq(response.text)
classes = []
category_selectors = ['.category-list ul li', '.nav-menu li', '.menu li', 'nav ul li']
for selector in category_selectors:
for k in data(selector).items():
link = k('a')
href = (link.attr('href') or '').strip()
name = (link.text() or '').strip()
if not href or href == '#' or not name: continue
classes.append({'type_name': name, 'type_id': href})
if classes: break
if not classes:
classes = [{'type_name': '最新', 'type_id': '/latest/'}, {'type_name': '热门', 'type_id': '/hot/'}]
return {'class': classes, 'list': self.getlist(data('#index article, article'))}
except Exception as e:
return {'class': [], 'list': []}
def homeVideoContent(self):
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'list': []}
data = self.getpq(response.text)
return {'list': self.getlist(data('#index article, article'))}
except Exception as e:
return {'list': []}
def categoryContent(self, tid, pg, filter, extend):
try:
if '@folder' in tid:
v = self.getfod(tid.replace('@folder', ''))
return {'list': v, 'page': 1, 'pagecount': 1, 'limit': 90, 'total': len(v)}
pg = int(pg) if pg else 1
if tid.startswith('http'):
base_url = tid.rstrip('/')
else:
path = tid if tid.startswith('/') else f"/{tid}"
base_url = f"{self.host}{path}".rstrip('/')
if pg == 1:
url = f"{base_url}/"
else:
url = f"{base_url}/{pg}/"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0}
data = self.getpq(response.text)
videos = self.getlist(data('#archive article, #index article, article'), tid)
return {'list': videos, 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999}
except Exception as e:
return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0}
def detailContent(self, ids):
try:
url = ids[0] if ids[0].startswith('http') else f"{self.host}{ids[0]}"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
data = self.getpq(response.text)
plist = []
used_names = set()
if data('.dplayer'):
for c, k in enumerate(data('.dplayer').items(), start=1):
try:
config_attr = k.attr('data-config')
if config_attr:
config = json.loads(config_attr)
video_url = config.get('video', {}).get('url', '')
if video_url:
ep_name = ''
parent = k.parents().eq(0)
for _ in range(4):
if not parent: break
heading = parent.find('h2, h3, h4').eq(0).text().strip()
if heading:
ep_name = heading
break
parent = parent.parents().eq(0)
base_name = ep_name if ep_name else f"视频{c}"
name = base_name
count = 2
while name in used_names:
name = f"{base_name} {count}"
count += 1
used_names.add(name)
plist.append(f"{name}${video_url}")
except: continue
if not plist:
content_area = data('.post-content, article')
for i, link in enumerate(content_area('a').items(), start=1):
link_text = link.text().strip()
link_href = link.attr('href')
if link_href and any(kw in link_text for kw in ['点击观看', '观看', '播放', '视频', '第一弹', '第二弹', '第三弹', '第四弹', '第五弹', '第六弹', '第七弹', '第八弹', '第九弹', '第十弹']):
ep_name = link_text.replace('点击观看:', '').replace('点击观看', '').strip()
if not ep_name: ep_name = f"视频{i}"
if not link_href.startswith('http'):
link_href = f"{self.host}{link_href}" if link_href.startswith('/') else f"{self.host}/{link_href}"
plist.append(f"{ep_name}${link_href}")
play_url = '#'.join(plist) if plist else f"未找到视频源${url}"
vod_content = ''
try:
tags = []
seen_names = set()
seen_ids = set()
tag_links = data('.tags a, .keywords a, .post-tags a')
candidates = []
for k in tag_links.items():
title = k.text().strip()
href = k.attr('href')
if title and href:
candidates.append({'name': title, 'id': href})
candidates.sort(key=lambda x: len(x['name']), reverse=True)
for item in candidates:
name = item['name']
id_ = item['id']
if id_ in seen_ids: continue
is_duplicate = False
for seen in seen_names:
if name in seen:
is_duplicate = True
break
if not is_duplicate:
target = json.dumps({'id': id_, 'name': name})
tags.append(f'[a=cr:{target}/]{name}[/a]')
seen_names.add(name)
seen_ids.add(id_)
if tags:
vod_content = ' '.join(tags)
else:
vod_content = data('.post-title').text()
except Exception:
vod_content = '获取标签失败'
if not vod_content:
vod_content = data('h1').text() or '91吃瓜中心'
return {'list': [{'vod_play_from': '91吃瓜中心', 'vod_play_url': play_url, 'vod_content': vod_content}]}
except:
return {'list': [{'vod_play_from': '91吃瓜中心', 'vod_play_url': '获取失败'}]}
def searchContent(self, key, quick, pg="1"):
try:
pg = int(pg) if pg else 1
if pg == 1:
url = f"{self.host}/search/{key}/"
else:
url = f"{self.host}/search/{key}/{pg}/"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
return {'list': self.getlist(self.getpq(response.text)('article')), 'page': pg, 'pagecount': 9999}
except:
return {'list': [], 'page': pg, 'pagecount': 9999}
def playerContent(self, flag, id, vipFlags):
parse = 0 if self.isVideoFormat(id) else 1
url = self.proxy(id) if '.m3u8' in id else id
return {'parse': parse, 'url': url, 'header': self.headers}
def localProxy(self, param):
try:
type_ = param.get('type')
url = param.get('url')
if type_ == 'cache':
key = param.get('key')
if content := img_cache.get(key):
return [200, 'image/jpeg', content]
return [404, 'text/plain', b'Expired']
elif type_ == 'img':
real_url = self.d64(url) if not url.startswith('http') else url
res = requests.get(real_url, headers=self.headers, proxies=self.proxies, timeout=10)
content = self.aesimg(res.content)
return [200, 'image/jpeg', content]
elif type_ == 'm3u8':
return self.m3Proxy(url)
else:
return self.tsProxy(url)
except:
return [404, 'text/plain', b'']
def proxy(self, data, type='m3u8'):
if data and self.proxies: return f"{self.getProxyUrl()}&url={self.e64(data)}&type={type}"
return data
def m3Proxy(self, url):
url = self.d64(url)
res = requests.get(url, headers=self.headers, proxies=self.proxies)
data = res.text
base = res.url.rsplit('/', 1)[0]
lines = []
for line in data.split('\n'):
if '#EXT' not in line and line.strip():
if not line.startswith('http'):
line = f"{base}/{line}"
lines.append(self.proxy(line, 'ts'))
else:
lines.append(line)
return [200, "application/vnd.apple.mpegurl", '\n'.join(lines)]
def tsProxy(self, url):
return [200, 'video/mp2t', requests.get(self.d64(url), headers=self.headers, proxies=self.proxies).content]
def e64(self, text):
return b64encode(str(text).encode()).decode()
def d64(self, text):
return b64decode(str(text).encode()).decode()
def aesimg(self, data):
if len(data) < 16: return data
keys = [(b'f5d965df75336270', b'97b60394abc2fbe1'), (b'75336270f5d965df', b'abc2fbe197b60394')]
for k, v in keys:
try:
dec = unpad(AES.new(k, AES.MODE_CBC, v).decrypt(data), 16)
if dec.startswith(b'\xff\xd8') or dec.startswith(b'\x89PNG'): return dec
except: pass
try:
dec = unpad(AES.new(k, AES.MODE_ECB).decrypt(data), 16)
if dec.startswith(b'\xff\xd8'): return dec
except: pass
return data
def getlist(self, data, tid=''):
videos = []
is_folder = '/mrdg' in (tid or '')
for k in data.items():
card_html = k.outer_html() if hasattr(k, 'outer_html') else str(k)
a = k if k.is_('a') else k('a').eq(0)
href = a.attr('href')
title = k('h2').text() or k('.entry-title').text() or k('.post-title').text()
if not title and k.is_('a'): title = k.text()
if href and title:
img = self.getimg(k('script').text(), k, card_html)
videos.append({
'vod_id': f"{href}{'@folder' if is_folder else ''}",
'vod_name': title.strip(),
'vod_pic': img,
'vod_remarks': k('time').text() or '',
'vod_tag': 'folder' if is_folder else '',
'style': {"type": "rect", "ratio": 1.33}
})
return videos
def getfod(self, id):
url = f"{self.host}{id}"
data = self.getpq(requests.get(url, headers=self.headers, proxies=self.proxies).text)
videos = []
for i, h2 in enumerate(data('.post-content h2').items()):
p_txt = data('.post-content p').eq(i * 2)
p_img = data('.post-content p').eq(i * 2 + 1)
p_html = p_img.outer_html() if hasattr(p_img, 'outer_html') else str(p_img)
videos.append({
'vod_id': p_txt('a').attr('href'),
'vod_name': p_txt.text().strip(),
'vod_pic': self.getimg('', p_img, p_html),
'vod_remarks': h2.text().strip()
})
return videos
def getimg(self, text, elem=None, html_content=None):
if m := re.search(r"loadBannerDirect\('([^']+)'", text or ''):
return self._proc_url(m.group(1))
if html_content is None and elem is not None:
html_content = elem.outer_html() if hasattr(elem, 'outer_html') else str(elem)
if not html_content: return ''
html_content = html_content.replace('&quot;', '"').replace('&apos;', "'").replace('&amp;', '&')
if 'data:image' in html_content:
m = re.search(r'(data:image/[a-zA-Z0-9+/=;,]+)', html_content)
if m: return self._proc_url(m.group(1))
m = re.search(r'(https?://[^"\'\s)]+\.(?:jpg|png|jpeg|webp))', html_content, re.I)
if m: return self._proc_url(m.group(1))
if 'url(' in html_content:
m = re.search(r'url\s*\(\s*[\'"]?([^"\'\)]+)[\'"]?\s*\)', html_content, re.I)
if m: return self._proc_url(m.group(1))
return ''
def _proc_url(self, url):
if not url: return ''
url = url.strip('\'" ')
if url.startswith('data:'):
try:
_, b64_str = url.split(',', 1)
raw = b64decode(b64_str)
if not (raw.startswith(b'\xff\xd8') or raw.startswith(b'\x89PNG') or raw.startswith(b'GIF8')):
raw = self.aesimg(raw)
key = hashlib.md5(raw).hexdigest()
img_cache[key] = raw
return f"{self.getProxyUrl()}&type=cache&key={key}"
except: return ""
if not url.startswith('http'):
url = f"{self.host}{url}" if url.startswith('/') else f"{self.host}/{url}"
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=img"
def getpq(self, data):
try: return pq(data)
except: return pq(data.encode('utf-8'))

391
lib/吃瓜网.py Normal file
View file

@ -0,0 +1,391 @@
import json
import re
import sys
import hashlib
from base64 import b64decode, b64encode
from urllib.parse import urlparse
import requests
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from pyquery import PyQuery as pq
sys.path.append('..')
from base.spider import Spider as BaseSpider
img_cache = {}
class Spider(BaseSpider):
def init(self, extend=""):
try:
self.proxies = json.loads(extend)
except:
self.proxies = {}
self.headers = {
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'Accept-Language': 'zh-CN,zh;q=0.9',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
}
self.host = self.get_working_host()
self.headers.update({'Origin': self.host, 'Referer': f"{self.host}/"})
print(f"使用站点: {self.host}")
def getName(self):
return "🌈 吃瓜网|终极完美版"
def isVideoFormat(self, url):
return any(ext in (url or '') for ext in ['.m3u8', '.mp4', '.ts'])
def manualVideoCheck(self):
return False
def destroy(self):
global img_cache
img_cache.clear()
def get_working_host(self):
dynamic_urls = [
'https://cgw.xwrfsps.cc/',
'https://dlx1w76jjz2r7.cloudfront.net/',
'https://cgw321.com/'
]
for url in dynamic_urls:
try:
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=10)
if response.status_code == 200:
return url
except Exception:
continue
return dynamic_urls[0]
def homeContent(self, filter):
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'class': [], 'list': []}
data = self.getpq(response.text)
classes = []
category_selectors = ['.category-list ul li', '.nav-menu li', '.menu li', 'nav ul li']
for selector in category_selectors:
for k in data(selector).items():
link = k('a')
href = (link.attr('href') or '').strip()
name = (link.text() or '').strip()
if not href or href == '#' or not name: continue
classes.append({'type_name': name, 'type_id': href})
if classes: break
if not classes:
classes = [{'type_name': '最新', 'type_id': '/latest/'}, {'type_name': '热门', 'type_id': '/hot/'}]
return {'class': classes, 'list': self.getlist(data('#index article, article'))}
except Exception as e:
return {'class': [], 'list': []}
def homeVideoContent(self):
try:
response = requests.get(self.host, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'list': []}
data = self.getpq(response.text)
return {'list': self.getlist(data('#index article, article'))}
except Exception as e:
return {'list': []}
def categoryContent(self, tid, pg, filter, extend):
try:
if '@folder' in tid:
v = self.getfod(tid.replace('@folder', ''))
return {'list': v, 'page': 1, 'pagecount': 1, 'limit': 90, 'total': len(v)}
pg = int(pg) if pg else 1
if tid.startswith('http'):
base_url = tid.rstrip('/')
else:
path = tid if tid.startswith('/') else f"/{tid}"
base_url = f"{self.host}{path}".rstrip('/')
if pg == 1:
url = f"{base_url}/"
else:
url = f"{base_url}/{pg}/"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
if response.status_code != 200: return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0}
data = self.getpq(response.text)
videos = self.getlist(data('#archive article, #index article, article'), tid)
return {'list': videos, 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 999999}
except Exception as e:
return {'list': [], 'page': pg, 'pagecount': 9999, 'limit': 90, 'total': 0}
def detailContent(self, ids):
try:
url = ids[0] if ids[0].startswith('http') else f"{self.host}{ids[0]}"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
data = self.getpq(response.text)
plist = []
used_names = set()
if data('.dplayer'):
for c, k in enumerate(data('.dplayer').items(), start=1):
try:
config_attr = k.attr('data-config')
if config_attr:
config = json.loads(config_attr)
video_url = config.get('video', {}).get('url', '')
if video_url:
ep_name = ''
parent = k.parents().eq(0)
for _ in range(4):
if not parent: break
heading = parent.find('h2, h3, h4').eq(0).text().strip()
if heading:
ep_name = heading
break
parent = parent.parents().eq(0)
base_name = ep_name if ep_name else f"视频{c}"
name = base_name
count = 2
while name in used_names:
name = f"{base_name} {count}"
count += 1
used_names.add(name)
plist.append(f"{name}${video_url}")
except: continue
if not plist:
content_area = data('.post-content, article')
for i, link in enumerate(content_area('a').items(), start=1):
link_text = link.text().strip()
link_href = link.attr('href')
if link_href and any(kw in link_text for kw in ['点击观看', '观看', '播放', '视频', '第一弹', '第二弹', '第三弹', '第四弹', '第五弹', '第六弹', '第七弹', '第八弹', '第九弹', '第十弹']):
ep_name = link_text.replace('点击观看:', '').replace('点击观看', '').strip()
if not ep_name: ep_name = f"视频{i}"
if not link_href.startswith('http'):
link_href = f"{self.host}{link_href}" if link_href.startswith('/') else f"{self.host}/{link_href}"
plist.append(f"{ep_name}${link_href}")
play_url = '#'.join(plist) if plist else f"未找到视频源${url}"
vod_content = ''
try:
tags = []
seen_names = set()
seen_ids = set()
tag_links = data('.tags a, .keywords a, .post-tags a')
candidates = []
for k in tag_links.items():
title = k.text().strip()
href = k.attr('href')
if title and href:
candidates.append({'name': title, 'id': href})
candidates.sort(key=lambda x: len(x['name']), reverse=True)
for item in candidates:
name = item['name']
id_ = item['id']
if id_ in seen_ids: continue
is_duplicate = False
for seen in seen_names:
if name in seen:
is_duplicate = True
break
if not is_duplicate:
target = json.dumps({'id': id_, 'name': name})
tags.append(f'[a=cr:{target}/]{name}[/a]')
seen_names.add(name)
seen_ids.add(id_)
if tags:
vod_content = ' '.join(tags)
else:
vod_content = data('.post-title').text()
except Exception:
vod_content = '获取标签失败'
if not vod_content:
vod_content = data('h1').text() or '吃瓜网'
return {'list': [{'vod_play_from': '吃瓜网', 'vod_play_url': play_url, 'vod_content': vod_content}]}
except:
return {'list': [{'vod_play_from': '吃瓜网', 'vod_play_url': '获取失败'}]}
def searchContent(self, key, quick, pg="1"):
try:
pg = int(pg) if pg else 1
if pg == 1:
url = f"{self.host}/search/{key}/"
else:
url = f"{self.host}/search/{key}/{pg}/"
response = requests.get(url, headers=self.headers, proxies=self.proxies, timeout=15)
return {'list': self.getlist(self.getpq(response.text)('article')), 'page': pg, 'pagecount': 9999}
except:
return {'list': [], 'page': pg, 'pagecount': 9999}
def playerContent(self, flag, id, vipFlags):
parse = 0 if self.isVideoFormat(id) else 1
url = self.proxy(id) if '.m3u8' in id else id
return {'parse': parse, 'url': url, 'header': self.headers}
def localProxy(self, param):
try:
type_ = param.get('type')
url = param.get('url')
if type_ == 'cache':
key = param.get('key')
if content := img_cache.get(key):
return [200, 'image/jpeg', content]
return [404, 'text/plain', b'Expired']
elif type_ == 'img':
real_url = self.d64(url) if not url.startswith('http') else url
res = requests.get(real_url, headers=self.headers, proxies=self.proxies, timeout=10)
content = self.aesimg(res.content)
return [200, 'image/jpeg', content]
elif type_ == 'm3u8':
return self.m3Proxy(url)
else:
return self.tsProxy(url)
except:
return [404, 'text/plain', b'']
def proxy(self, data, type='m3u8'):
if data and self.proxies: return f"{self.getProxyUrl()}&url={self.e64(data)}&type={type}"
return data
def m3Proxy(self, url):
url = self.d64(url)
res = requests.get(url, headers=self.headers, proxies=self.proxies)
data = res.text
base = res.url.rsplit('/', 1)[0]
lines = []
for line in data.split('\n'):
if '#EXT' not in line and line.strip():
if not line.startswith('http'):
line = f"{base}/{line}"
lines.append(self.proxy(line, 'ts'))
else:
lines.append(line)
return [200, "application/vnd.apple.mpegurl", '\n'.join(lines)]
def tsProxy(self, url):
return [200, 'video/mp2t', requests.get(self.d64(url), headers=self.headers, proxies=self.proxies).content]
def e64(self, text):
return b64encode(str(text).encode()).decode()
def d64(self, text):
return b64decode(str(text).encode()).decode()
def aesimg(self, data):
if len(data) < 16: return data
keys = [(b'f5d965df75336270', b'97b60394abc2fbe1'), (b'75336270f5d965df', b'abc2fbe197b60394')]
for k, v in keys:
try:
dec = unpad(AES.new(k, AES.MODE_CBC, v).decrypt(data), 16)
if dec.startswith(b'\xff\xd8') or dec.startswith(b'\x89PNG'): return dec
except: pass
try:
dec = unpad(AES.new(k, AES.MODE_ECB).decrypt(data), 16)
if dec.startswith(b'\xff\xd8'): return dec
except: pass
return data
def getlist(self, data, tid=''):
videos = []
is_folder = '/mrdg' in (tid or '')
for k in data.items():
card_html = k.outer_html() if hasattr(k, 'outer_html') else str(k)
a = k if k.is_('a') else k('a').eq(0)
href = a.attr('href')
title = k('h2').text() or k('.entry-title').text() or k('.post-title').text()
if not title and k.is_('a'): title = k.text()
if href and title:
img = self.getimg(k('script').text(), k, card_html)
videos.append({
'vod_id': f"{href}{'@folder' if is_folder else ''}",
'vod_name': title.strip(),
'vod_pic': img,
'vod_remarks': k('time').text() or '',
'vod_tag': 'folder' if is_folder else '',
'style': {"type": "rect", "ratio": 1.33}
})
return videos
def getfod(self, id):
url = f"{self.host}{id}"
data = self.getpq(requests.get(url, headers=self.headers, proxies=self.proxies).text)
videos = []
for i, h2 in enumerate(data('.post-content h2').items()):
p_txt = data('.post-content p').eq(i * 2)
p_img = data('.post-content p').eq(i * 2 + 1)
p_html = p_img.outer_html() if hasattr(p_img, 'outer_html') else str(p_img)
videos.append({
'vod_id': p_txt('a').attr('href'),
'vod_name': p_txt.text().strip(),
'vod_pic': self.getimg('', p_img, p_html),
'vod_remarks': h2.text().strip()
})
return videos
def getimg(self, text, elem=None, html_content=None):
if m := re.search(r"loadBannerDirect\('([^']+)'", text or ''):
return self._proc_url(m.group(1))
if html_content is None and elem is not None:
html_content = elem.outer_html() if hasattr(elem, 'outer_html') else str(elem)
if not html_content: return ''
html_content = html_content.replace('&quot;', '"').replace('&apos;', "'").replace('&amp;', '&')
if 'data:image' in html_content:
m = re.search(r'(data:image/[a-zA-Z0-9+/=;,]+)', html_content)
if m: return self._proc_url(m.group(1))
m = re.search(r'(https?://[^"\'\s)]+\.(?:jpg|png|jpeg|webp))', html_content, re.I)
if m: return self._proc_url(m.group(1))
if 'url(' in html_content:
m = re.search(r'url\s*\(\s*[\'"]?([^"\'\)]+)[\'"]?\s*\)', html_content, re.I)
if m: return self._proc_url(m.group(1))
return ''
def _proc_url(self, url):
if not url: return ''
url = url.strip('\'" ')
if url.startswith('data:'):
try:
_, b64_str = url.split(',', 1)
raw = b64decode(b64_str)
if not (raw.startswith(b'\xff\xd8') or raw.startswith(b'\x89PNG') or raw.startswith(b'GIF8')):
raw = self.aesimg(raw)
key = hashlib.md5(raw).hexdigest()
img_cache[key] = raw
return f"{self.getProxyUrl()}&type=cache&key={key}"
except: return ""
if not url.startswith('http'):
url = f"{self.host}{url}" if url.startswith('/') else f"{self.host}/{url}"
return f"{self.getProxyUrl()}&url={self.e64(url)}&type=img"
def getpq(self, data):
try: return pq(data)
except: return pq(data.encode('utf-8'))