当前位置: 首页 > news >正文

多进程优化顶点小说爬虫:加速爬取速度的深度优化策略

顶点小说进阶(多进程+协程)

建议:

看之前可以先看我之前发布的文章(异步优化与数据入库: 顶点小说爬虫进阶实战)

这篇文章基于上篇文章:进行了多进程处理,大大加快了爬取速度

案例:顶点小说完善(多进程)

优化思路:

  1. 导包:from multiprocessing import Pool
  2. 对于每一页的所有小说采用一个进程,建立进程池,for循环处向进程池添加任务(对于每一页的所有小说的处理封装成一个方法作为任务添加到进程池)
import asyncio
import logging
import time
import requests
from lxml import etree
import aiohttp
import aiomysql
from aiohttp import ContentTypeError
from multiprocessing import PoolCONCURRENCY = 4logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s : %(message)s')class Spider(object):def __init__(self):# 方便设置头部信息、代理IP、cookie信息等self.session = None# 设置协程数量self.semaphore = asyncio.Semaphore(CONCURRENCY)# 限制协程的并发数:# 如果并发数没有达到限制: 那么async with semaphore会瞬间执行完成,进入里面的正式代码中# 如果并发数已经达到了限制,那么其他的协程对象会阻塞在asyn with semaphore这个地方,直到正在运行的某个协程对象完成了,退出了,才会放行一个新的协程对象去替换掉这个已经完成的协程对象# 初始化数据库连接池async def init_pool(self):self.pool = await aiomysql.create_pool(host="127.0.0.1",port=3306,user="root",password="123456",db=f"dingdian",autocommit=True  # Ensure autocommit is set to True for aiomysql)# 在 aiomysql.create_pool 方法中,不需要显式传递 loop 参数。aiomysql 会自动使用当前的事件循环(即默认的全局事件循环)。# 关闭数据库连接池async def close_pool(self):if self.pool:self.pool.close()await self.pool.wait_closed()# 获取url源码async def scrape_api(self, url):# 设置协程数量async with self.semaphore:logging.info(f"scraping {url}")try:async with self.session.get(url) as response:# 控制爬取(或请求)的速率,以避免对目标服务器造成过多的负荷或请求频率过高而被封禁或限制访问。await asyncio.sleep(1)# 在异步环境中,可能需要使用 response.content.read() 或 await response.text() 来获取文本内容。return await response.text()except ContentTypeError as e:  # aiohttp 的 ContentTypeError 异常: 请求内容类型错误 或者 响应内容类型错误# exc_info=True 参数将导致 logging 模块记录完整的异常信息,包括栈跟踪,这对于调试非常有用。logging.error(f'error occurred while scraping {url}', exc_info=True)# 获取小说分类urlasync def get_type(self):url = "https://www.cdbxs.com/sort/"source = await self.scrape_api(url)href_lists = etree.HTML(source).xpath('//ul[@class="nav"]/li/a/@href')[2:-4]type_lists = []for href in href_lists:type_lists.append(f"{url}{href.split('/')[2]}/1/")# print(type_lists)return type_lists# 获取最大页async def get_max_page(self, first_page_url):source = await self.scrape_api(first_page_url)# print(source)max_page = etree.HTML(source).xpath('//a[13]/text()')return max_page# 获取小说列表页信息async def get_book_info(self, every_page_url):source = await self.scrape_api(every_page_url)book_lists = []lis = etree.HTML(source).xpath("//ul[@class='txt-list txt-list-row5']/li")for li in lis:book_id_url = li.xpath("span[@class='s2']/a/@href")[0]book_id = book_id_url.split('/')[3]# 书名book_name = li.xpath("span[@class='s2']/a/text()")[0]# 最新章节new_chapter = li.xpath("span[@class='s3']/a/text()")[0]# 作者author = li.xpath("span[@class='s4']/text()")[0]# 更新时间update_time = li.xpath("span[@class='s5']/text()")[0]source = await self.scrape_api(f"https://www.cdbxs.com{book_id_url}")# 字数font_num = etree.HTML(source).xpath("//p[6]/span/text()")[0]# 摘要summary = etree.HTML(source).xpath("//div[@class='desc xs-hidden']/text()")[0]# 以元组添加至 book_lists# print((book_id, book_name, new_chapter, author, update_time, font_num, summary))book_lists.append((book_id, book_name, new_chapter, author, update_time, font_num, summary))return book_lists# 获取章节urlsasync def get_chapter_urls(self, chapter_list_url):source = await self.scrape_api(chapter_list_url)# 章节urlchapter_urls = map(lambda x: "https://www.cdbxs.com" + x, etree.HTML(source).xpath("//div[@class='section-box'][2]/ul[@class='section-list fix']/li/a/@href | //div[@class='section-box'][1]/ul[@class='section-list fix']/li/a/@href"))return chapter_urls# 获取章节详情信息async def get_chapter_info(self, chapter_url):source = await self.scrape_api(chapter_url)# 标题title = etree.HTML(source).xpath("//h1[@class='title']/text()")# 正文content = ''.join(etree.HTML(source).xpath("//div[@id='nb_content']/dd//text()"))if title:return f'\'{title[0]}\'', f'\'{content}\''else:return '', f'\'{content}\''# 入库async def save_to_mysql(self, table_name, table_column_str, table_info_str):async with self.pool.acquire() as conn:async with conn.cursor() as cursor:sql = f'insert into {table_name}({table_column_str}) values{table_info_str}'# 执行SQL语句await cursor.execute(sql)await conn.commit()async def main(self):# headersglobal poolheaders = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0"}# 建立异步请求需要的session(主要加header头信息以及代理,cookie等头信息)self.session = aiohttp.ClientSession(headers=headers)# 获取小说分类urltype_lists = await self.get_type()# 分类url默认为第一页for first_page_url in type_lists:# 获取带分类的url的前半截type_url = first_page_url.split('1')[0]# 获取此分类下最大页max_page = await self.get_max_page(first_page_url)# 生成此分类下每一页urlfor every_page in range(1, int(max_page[0]) + 1):every_page_url = f"{type_url}{every_page}/"# 获取小说列表页信息book_info_lists = await self.get_book_info(every_page_url)# 创建进程池pool = Pool(16)for book_info in book_info_lists:# 多进程抓取每本小说pool.apply_async(await self.run(book_info))# 关闭进程池,即停止接受新的任务。pool.close()# 等待所有的子进程执行结束。它会阻塞主进程,直到进程池中所有的任务都被执行完毕,然后才会继续执行主进程后面的代码。# 调用 join() 方法之前,应该先调用 close() 方法来确保不会再有新的任务被提交进来。pool.join()# 关闭连接池self.close_pool()# 关闭连接await self.session.close()# run方法: 抓取每一本小说的所有章节async def run(self, book_info):print(f"爬取小说:{book_info[1]}...")# 初始化数据库连接池await self.init_pool()# 入库小说信息await self.save_to_mysql('books','book_id, book_name, new_chapter, author, update_time, font_num, summary',book_info)# 获取章节urlsbook_id = book_info[0]chapter_urls = await self.get_chapter_urls(f"https://www.cdbxs.com/booklist/b/{book_id}/1")# 多协程抓取小说各个章节# 生成scrape_detail任务列表scrape_detail_tasks = [asyncio.ensure_future(self.get_chapter_info(chapter_url)) for chapter_url inchapter_urls]# 并发执行任务,获取结果chapter_details = list(await asyncio.gather(*scrape_detail_tasks))  # await asyncio.gather(*scrape_detail_tasks生成元组# 入库# 1.添加book_id 到 chapter_detailfor i in range(len(chapter_details)):chapter_detail = list(chapter_details[i])chapter_detail.append(book_id)chapter_detail = tuple(chapter_detail)chapter_details[i] = chapter_detail# 2.保存至数据库[await self.save_to_mysql('chapters', 'chapter_name,chapter_content, bid',chapter_detail) for chapter_detail in chapter_details]
if __name__ == '__main__':# 开始时间start_time = time.time()# 初始化Spiderspider = Spider()# 创建事件循环池loop = asyncio.get_event_loop()# 注册loop.run_until_complete(spider.main())# 结束时间end_time = time.time()logging.info(f'total time: {end_time - start_time}')

后续发布爬虫更多精致内容(按某培训机构爬虫课程顺序发布,欢迎关注后续发布)

更多精致内容

在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 二进制部署k8s
  • 异步日志:性能优化的金钥匙
  • Vue封装Tooltip(提示工具)
  • 产业知识图谱:金融科技的创新引擎
  • MFC扩展库BCGControlBar Pro v35.0 - 可视化管理主题等全新升级
  • 在大型企业级应用中,如何优化 XML 数据的存储和检索效率,以满足高并发访问需求?
  • Python打开Excel文档并读取数据
  • Grafana
  • 光学传感器图像处理流程(一)
  • 怎么用PPT录制微课?详细步骤解析!
  • IMS架构中的注册与会话流程:RTPEngine集成及消息路由详解
  • sqlalchemy.orm中validates对两个字段进行联合校验
  • 浅析Kafka Streams中KTable.aggregate()方法的使用
  • 【活动预告】Apache IoTDB TsFile 智慧能源应用“上会”啦!
  • 【中级通信考试】-动力与环境专业:第四章 机房空调系统
  • 【跃迁之路】【477天】刻意练习系列236(2018.05.28)
  • Angular 响应式表单 基础例子
  • co模块的前端实现
  • ES10 特性的完整指南
  • JavaScript函数式编程(一)
  • Java新版本的开发已正式进入轨道,版本号18.3
  • js 实现textarea输入字数提示
  • k个最大的数及变种小结
  • Mocha测试初探
  • Spark RDD学习: aggregate函数
  • Spring声明式事务管理之一:五大属性分析
  • 从@property说起(二)当我们写下@property (nonatomic, weak) id obj时,我们究竟写了什么...
  • 聊聊directory traversal attack
  • 网络应用优化——时延与带宽
  • 微信开源mars源码分析1—上层samples分析
  • CMake 入门1/5:基于阿里云 ECS搭建体验环境
  • Linux权限管理(week1_day5)--技术流ken
  • zabbix3.2监控linux磁盘IO
  • 关于Android全面屏虚拟导航栏的适配总结
  • ​14:00面试,14:06就出来了,问的问题有点变态。。。
  • #、%和$符号在OGNL表达式中经常出现
  • #Z2294. 打印树的直径
  • %check_box% in rails :coditions={:has_many , :through}
  • (2)(2.10) LTM telemetry
  • (3)nginx 配置(nginx.conf)
  • (C#)获取字符编码的类
  • (二十五)admin-boot项目之集成消息队列Rabbitmq
  • (仿QQ聊天消息列表加载)wp7 listbox 列表项逐一加载的一种实现方式,以及加入渐显动画...
  • (九)c52学习之旅-定时器
  • (离散数学)逻辑连接词
  • (一)pytest自动化测试框架之生成测试报告(mac系统)
  • .apk 成为历史!
  • .NET Core 通过 Ef Core 操作 Mysql
  • .net 流——流的类型体系简单介绍
  • .NET 中 GetProcess 相关方法的性能
  • .Net(C#)常用转换byte转uint32、byte转float等
  • .NET/C# 异常处理:写一个空的 try 块代码,而把重要代码写到 finally 中(Constrained Execution Regions)
  • @modelattribute注解用postman测试怎么传参_接口测试之问题挖掘
  • @Transactional 详解
  • [].shift.call( arguments ) 和 [].slice.call( arguments )