import requests
import json
import time
from typing import Dict, List, Any
class VideoScraper:
def __init__(self):
# 微信环境请求头 - 与test_categories.py完全相同
self.headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 12; MI 12 Build/SKQ1.211230.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/89.0.4389.72 MQQBrowser/6.2 TBS/046014 Mobile Safari/537.36 MicroMessenger/8.0.27.2220(0x28001B51) Process/appbrand NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
'X-Requested-With': 'XMLHttpRequest',
'Referer': 'https://xxx.xxxx.top/',
'Origin': 'https://xxx.xxxx.top',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Sec-Ch-Ua': '"Not A;Brand";v="99", "Chromium";v="90"',
'Sec-Ch-Ua-Mobile': '?1'
}
# API地址
self.list_url = 'https://xxx.xxxx.top/ajaxHomeMore.html'
self.episode_url = 'https://xxxx.xxxx.top/api/episode.html'
# 存储所有视频数据
self.all_videos = []
def get_video_list(self, page: int = 1, cid: int = 0, mid: int = 0, area: int = 0, year: int = 0, lang: int = 0) -> Dict[str, Any]:
"""获取视频列表 - 使用与test_categories.py相同的请求方式"""
data = {'page': page, 'cid': cid, 'mid': mid, 'area': area, 'year': year, 'lang': lang}
try:
response = requests.post(self.list_url, data=data, headers=self.headers, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
print(f'获取视频列表失败: {e}')
return {}
def get_episode_info(self, video_id: int) -> Dict[str, Any]:
"""获取视频集数信息"""
data = {'video_id': video_id}
# 使用video.py中的请求头
episode_headers = {
'User-Agent': 'Mozilla/5.0 (Linux; Android 12) AppleWebKit/537.36',
'Referer': f'https://xxx.xxxx.top/play.html?id={video_id}'
}
try:
response = requests.post(self.episode_url, data=data, headers=episode_headers, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
print(f'获取视频{video_id}集数信息失败: {e}')
return {}
def get_all_categories_videos(self, max_pages: int = 2) -> List[Dict[str, Any]]:
"""获取所有分类的视频"""
print('开始获取所有分类的视频...')
# 根据网站实际分类定义 - 先只测试成功的分类
categories = [
{'cid': 0, 'mid': 0, 'area': 0, 'year': 0, 'lang': 0, 'name': 'بارلىق'},
]
for category in categories:
category_name = category['name']
params = {k: v for k, v in category.items() if k != 'name'}
print(f'\n正在获取分类: {category_name}')
print(f' 参数: {params}')
for page in range(1, max_pages + 1):
print(f' 正在获取第 {page} 页...')
result = self.get_video_list(page=page, **params)
if result and 'info' in result and 'data' in result['info']:
videos = result['info'].get('data', [])
print(f' 原始响应视频数量: {len(videos)}')
for video in videos:
# 添加分类信息
video['category_name'] = category_name
video['category_params'] = params
self.all_videos.append(video)
print(f' ✓ 获取到 {len(videos)} 个视频')
print(f' 当前总视频数: {len(self.all_videos)}')
if videos:
print(f' 第一个视频: {videos[0].get("title", "未知标题")}')
else:
print(f' ✗ 第 {page} 页获取失败或无数据')
if result:
print(f' 响应keys: {list(result.keys()) if isinstance(result, dict) else "not dict"}')
if 'info' in result:
info = result['info']
if isinstance(info, dict) and 'data' in info:
print(f' info.data长度: {len(info["data"])}')
# 添加延迟避免请求过快
time.sleep(1)
print(f'\n总共获取到 {len(self.all_videos)} 个视频')
return self.all_videos
def enrich_videos_with_episodes(self) -> List[Dict[str, Any]]:
"""为视频添加集数信息"""
print('\n开始获取视频集数信息...')
enriched_videos = []
# 只处理前10个视频进行测试
test_videos = self.all_videos[:10]
for i, video in enumerate(test_videos):
video_id = video.get('id')
video_title = video.get('title', '未知标题')
print(f'({i+1}/{len(test_videos)}) 正在获取视频: {video_title} (ID: {video_id})')
# 创建优化后的视频信息
enriched_video = {
'video_id': video_id,
'title': video_title,
'cover': video.get('cover', ''),
'description': video.get('description', ''),
'rating': video.get('star', '0'),
'views': video.get('view', 0),
'likes': video.get('good', 0),
'category': {
'name': video.get('category_name', '未知'),
'params': video.get('category_params', {})
},
'metadata': {
'type_id': video.get('mid', 0),
'area_id': video.get('area', 0),
'year_id': video.get('year', 0),
'lang_id': video.get('lang', 0),
'is_vip': bool(video.get('is_vip', 0)),
'status': video.get('status', 0)
},
'pricing': {
'price': video.get('price', '0.00'),
'episode_count': video.get('episode', 0)
},
'timestamps': {
'created': video.get('create_time', ''),
'updated': video.get('update_time', '')
},
'episode_info': {
'available': False,
'episodes': [],
'total_episodes': 0
}
}
# 获取集数信息
episode_info = self.get_episode_info(video_id)
# 处理集数信息
if episode_info and episode_info.get('code') == 1:
episode_data = episode_info.get('data', {})
enriched_video['episode_info'] = {
'available': True,
'episodes': episode_data.get('episodes', []),
'total_episodes': len(episode_data.get('episodes', [])),
'current_episode': episode_data.get('current', 0),
'next_episode': episode_data.get('next', 0)
}
print(f' ✓ 获取到 {len(episode_data.get("episodes", []))} 集信息')
else:
print(f' ✗ 无法获取集数信息 (可能需要VIP)')
enriched_videos.append(enriched_video)
# 添加延迟避免请求过快
time.sleep(0.5)
return enriched_videos
def save_to_json(self, data: List[Dict[str, Any]], filename: str = 'video.json'):
"""保存数据到JSON文件"""
try:
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f'\n数据已保存到 {filename}')
print(f'文件大小: {len(json.dumps(data, ensure_ascii=False))} 字符')
except Exception as e:
print(f'保存文件失败: {e}')
def run(self, max_pages: int = 2, get_episodes: bool = True):
"""运行完整的爬取流程"""
print('=== 视频爬虫启动 ===')
# 获取视频列表
videos = self.get_all_categories_videos(max_pages)
if not videos:
print('未获取到任何视频数据')
return
# 去重 - 根据video_id去重
unique_videos = {}
for video in videos:
video_id = video.get('id')
if video_id and video_id not in unique_videos:
unique_videos[video_id] = video
self.all_videos = list(unique_videos.values())
print(f'\n去重后剩余 {len(self.all_videos)} 个视频')
# 获取集数信息
if get_episodes:
enriched_videos = self.enrich_videos_with_episodes()
else:
enriched_videos = [{
'video_id': v.get('id'),
'title': v.get('title'),
'cover': v.get('cover'),
'category': v.get('category_name'),
'raw_data': v
} for v in self.all_videos]
# 保存到文件
self.save_to_json(enriched_videos)
print(f'\n=== 爬取完成 ===')
print(f'总共处理了 {len(enriched_videos)} 个视频')
# 统计信息
if get_episodes:
vip_count = sum(1 for v in enriched_videos if v['metadata']['is_vip'])
episodes_available = sum(1 for v in enriched_videos if v['episode_info']['available'])
print(f'VIP视频: {vip_count}')
print(f'可获取集数: {episodes_available}')
if __name__ == '__main__':
scraper = VideoScraper()
# 运行爬虫,获取前2页视频,并获取集数信息
scraper.run(max_pages=2, get_episodes=True)
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END








暂无评论内容