python爬虫

python爬虫

import requests
import json
import time
from typing import Dict, List, Any

class VideoScraper:
    def __init__(self):
        # 微信环境请求头 - 与test_categories.py完全相同
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Linux; Android 12; MI 12 Build/SKQ1.211230.001; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/89.0.4389.72 MQQBrowser/6.2 TBS/046014 Mobile Safari/537.36 MicroMessenger/8.0.27.2220(0x28001B51) Process/appbrand NetType/WIFI Language/zh_CN ABI/arm64 MiniProgramEnv/android',
            'Accept': 'application/json, text/plain, */*',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'Connection': 'keep-alive',
            'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
            'X-Requested-With': 'XMLHttpRequest',
            'Referer': 'https://xxx.xxxx.top/',
            'Origin': 'https://xxx.xxxx.top',
            'Sec-Fetch-Dest': 'empty',
            'Sec-Fetch-Mode': 'cors',
            'Sec-Fetch-Site': 'same-origin',
            'Sec-Ch-Ua': '"Not A;Brand";v="99", "Chromium";v="90"',
            'Sec-Ch-Ua-Mobile': '?1'
        }
        
        # API地址
        self.list_url = 'https://xxx.xxxx.top/ajaxHomeMore.html'
        self.episode_url = 'https://xxxx.xxxx.top/api/episode.html'
        
        # 存储所有视频数据
        self.all_videos = []
    
    def get_video_list(self, page: int = 1, cid: int = 0, mid: int = 0, area: int = 0, year: int = 0, lang: int = 0) -> Dict[str, Any]:
        """获取视频列表 - 使用与test_categories.py相同的请求方式"""
        data = {'page': page, 'cid': cid, 'mid': mid, 'area': area, 'year': year, 'lang': lang}
        
        try:
            response = requests.post(self.list_url, data=data, headers=self.headers, timeout=10)
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f'获取视频列表失败: {e}')
            return {}
    
    def get_episode_info(self, video_id: int) -> Dict[str, Any]:
        """获取视频集数信息"""
        data = {'video_id': video_id}
        
        # 使用video.py中的请求头
        episode_headers = {
            'User-Agent': 'Mozilla/5.0 (Linux; Android 12) AppleWebKit/537.36',
            'Referer': f'https://xxx.xxxx.top/play.html?id={video_id}'
        }
        
        try:
            response = requests.post(self.episode_url, data=data, headers=episode_headers, timeout=10)
            response.raise_for_status()
            return response.json()
        except Exception as e:
            print(f'获取视频{video_id}集数信息失败: {e}')
            return {}
    
    def get_all_categories_videos(self, max_pages: int = 2) -> List[Dict[str, Any]]:
        """获取所有分类的视频"""
        print('开始获取所有分类的视频...')
        
        # 根据网站实际分类定义 - 先只测试成功的分类
        categories = [
            {'cid': 0, 'mid': 0, 'area': 0, 'year': 0, 'lang': 0, 'name': 'بارلىق'},
        ]
        
        for category in categories:
            category_name = category['name']
            params = {k: v for k, v in category.items() if k != 'name'}
            
            print(f'\n正在获取分类: {category_name}')
            print(f'  参数: {params}')
            
            for page in range(1, max_pages + 1):
                print(f'  正在获取第 {page} 页...')
                
                result = self.get_video_list(page=page, **params)
                
                if result and 'info' in result and 'data' in result['info']:
                    videos = result['info'].get('data', [])
                    print(f'    原始响应视频数量: {len(videos)}')
                    for video in videos:
                        # 添加分类信息
                        video['category_name'] = category_name
                        video['category_params'] = params
                        self.all_videos.append(video)
                    
                    print(f'    ✓ 获取到 {len(videos)} 个视频')
                    print(f'    当前总视频数: {len(self.all_videos)}')
                    if videos:
                        print(f'      第一个视频: {videos[0].get("title", "未知标题")}')
                else:
                    print(f'    ✗ 第 {page} 页获取失败或无数据')
                    if result:
                        print(f'      响应keys: {list(result.keys()) if isinstance(result, dict) else "not dict"}')
                        if 'info' in result:
                            info = result['info']
                            if isinstance(info, dict) and 'data' in info:
                                print(f'      info.data长度: {len(info["data"])}')
                
                # 添加延迟避免请求过快
                time.sleep(1)
        
        print(f'\n总共获取到 {len(self.all_videos)} 个视频')
        return self.all_videos
    
    def enrich_videos_with_episodes(self) -> List[Dict[str, Any]]:
        """为视频添加集数信息"""
        print('\n开始获取视频集数信息...')
        
        enriched_videos = []
        
        # 只处理前10个视频进行测试
        test_videos = self.all_videos[:10]
        
        for i, video in enumerate(test_videos):
            video_id = video.get('id')
            video_title = video.get('title', '未知标题')
            
            print(f'({i+1}/{len(test_videos)}) 正在获取视频: {video_title} (ID: {video_id})')
            
            # 创建优化后的视频信息
            enriched_video = {
                'video_id': video_id,
                'title': video_title,
                'cover': video.get('cover', ''),
                'description': video.get('description', ''),
                'rating': video.get('star', '0'),
                'views': video.get('view', 0),
                'likes': video.get('good', 0),
                'category': {
                    'name': video.get('category_name', '未知'),
                    'params': video.get('category_params', {})
                },
                'metadata': {
                    'type_id': video.get('mid', 0),
                    'area_id': video.get('area', 0),
                    'year_id': video.get('year', 0),
                    'lang_id': video.get('lang', 0),
                    'is_vip': bool(video.get('is_vip', 0)),
                    'status': video.get('status', 0)
                },
                'pricing': {
                    'price': video.get('price', '0.00'),
                    'episode_count': video.get('episode', 0)
                },
                'timestamps': {
                    'created': video.get('create_time', ''),
                    'updated': video.get('update_time', '')
                },
                'episode_info': {
                    'available': False,
                    'episodes': [],
                    'total_episodes': 0
                }
            }
            
            # 获取集数信息
            episode_info = self.get_episode_info(video_id)
            
            # 处理集数信息
            if episode_info and episode_info.get('code') == 1:
                episode_data = episode_info.get('data', {})
                enriched_video['episode_info'] = {
                    'available': True,
                    'episodes': episode_data.get('episodes', []),
                    'total_episodes': len(episode_data.get('episodes', [])),
                    'current_episode': episode_data.get('current', 0),
                    'next_episode': episode_data.get('next', 0)
                }
                print(f'  ✓ 获取到 {len(episode_data.get("episodes", []))} 集信息')
            else:
                print(f'  ✗ 无法获取集数信息 (可能需要VIP)')
            
            enriched_videos.append(enriched_video)
            
            # 添加延迟避免请求过快
            time.sleep(0.5)
        
        return enriched_videos
    
    def save_to_json(self, data: List[Dict[str, Any]], filename: str = 'video.json'):
        """保存数据到JSON文件"""
        try:
            with open(filename, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, indent=2)
            print(f'\n数据已保存到 {filename}')
            print(f'文件大小: {len(json.dumps(data, ensure_ascii=False))} 字符')
        except Exception as e:
            print(f'保存文件失败: {e}')
    
    def run(self, max_pages: int = 2, get_episodes: bool = True):
        """运行完整的爬取流程"""
        print('=== 视频爬虫启动 ===')
        
        # 获取视频列表
        videos = self.get_all_categories_videos(max_pages)
        
        if not videos:
            print('未获取到任何视频数据')
            return
        
        # 去重 - 根据video_id去重
        unique_videos = {}
        for video in videos:
            video_id = video.get('id')
            if video_id and video_id not in unique_videos:
                unique_videos[video_id] = video
        
        self.all_videos = list(unique_videos.values())
        print(f'\n去重后剩余 {len(self.all_videos)} 个视频')
        
        # 获取集数信息
        if get_episodes:
            enriched_videos = self.enrich_videos_with_episodes()
        else:
            enriched_videos = [{
                'video_id': v.get('id'),
                'title': v.get('title'),
                'cover': v.get('cover'),
                'category': v.get('category_name'),
                'raw_data': v
            } for v in self.all_videos]
        
        # 保存到文件
        self.save_to_json(enriched_videos)
        
        print(f'\n=== 爬取完成 ===')
        print(f'总共处理了 {len(enriched_videos)} 个视频')
        
        # 统计信息
        if get_episodes:
            vip_count = sum(1 for v in enriched_videos if v['metadata']['is_vip'])
            episodes_available = sum(1 for v in enriched_videos if v['episode_info']['available'])
            
            print(f'VIP视频: {vip_count}')
            print(f'可获取集数: {episodes_available}')

if __name__ == '__main__':
    scraper = VideoScraper()
    # 运行爬虫,获取前2页视频,并获取集数信息
    scraper.run(max_pages=2, get_episodes=True)
© 版权声明
THE END
喜欢就支持一下吧
点赞14 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容