网站批量查询北京综合网络营销哪里好

张小明 2026/1/9 19:10:14
网站批量查询,北京综合网络营销哪里好,聊城集团网站建设加盟,如何做商业网站推广前言 Scrapy 框架中#xff0c;爬虫解析出的 Item 数据最终需落地存储#xff0c;而管道#xff08;Pipeline#xff09;是实现数据持久化的核心组件。相较于直接在爬虫文件中处理数据存储#xff0c;Pipeline 具备模块化、可扩展、支持多管道协同处理的优势#xff0c;…前言Scrapy 框架中爬虫解析出的 Item 数据最终需落地存储而管道Pipeline是实现数据持久化的核心组件。相较于直接在爬虫文件中处理数据存储Pipeline 具备模块化、可扩展、支持多管道协同处理的优势可灵活实现数据清洗、去重、多源存储如文件、数据库、消息队列等需求。本文从 Pipeline 核心原理入手系统讲解不同场景下的数据持久化方案结合实战案例实现 JSON/CSV 文件存储、MySQL 数据库存储、MongoDB 文档存储及数据去重与校验帮助开发者掌握企业级爬虫的数据落地能力。摘要本文聚焦 Scrapy 管道Pipeline的数据持久化实战首先剖析 Pipeline 的执行流程与优先级规则明确其核心作用与开发规范其次以 知乎热榜 为爬取目标依次实现基础文件存储JSON/CSV、关系型数据库存储MySQL、非关系型数据库存储MongoDB三类核心持久化方案最后补充数据去重、数据校验、管道异常处理等进阶功能。通过本文读者可掌握 Scrapy Pipeline 的全场景开发能力实现爬虫数据的规范化、高可用持久化存储。一、Scrapy Pipeline 核心原理1.1 Pipeline 执行流程爬虫解析出 Item 后将其传递给引擎Engine引擎将 Item 依次发送至settings.py中配置的 Pipeline按优先级执行每个 Pipeline 可通过process_item方法处理 Item如清洗、存储处理完成后可选择将 Item 传递给下一个 Pipeline 或终止传递所有 Pipeline 执行完成后Item 数据完成持久化。1.2 核心方法与优先级规则方法名触发时机返回值规则process_item处理每个 Item 时返回 Item继续传递给下一个 Pipeline抛出 DropItem 异常终止传递并丢弃 Itemopen_spider爬虫启动时无返回值常用于初始化资源如数据库连接、文件句柄close_spider爬虫关闭时无返回值常用于释放资源如关闭数据库连接、文件句柄from_crawler管道初始化时可选返回管道实例常用于从配置文件读取参数如数据库地址、端口Pipeline 优先级通过settings.py中ITEM_PIPELINES的数字值控制数字越小优先级越高越先执行。例如python运行ITEM_PIPELINES { zhihu_hot.pipelines.DataValidatePipeline: 100, # 优先级最高数据校验 zhihu_hot.pipelines.MysqlPipeline: 300, # 次之MySQL 存储 zhihu_hot.pipelines.MongoDBPipeline: 400, # 优先级最低MongoDB 存储 }1.3 核心优势优势点具体说明模块化开发不同存储方式拆分至不同 Pipeline便于维护与扩展多管道协同可先校验数据再存储至多个数据源如同时存 MySQL 和 MongoDB资源统一管理通过open_spider/close_spider统一管理数据库连接、文件句柄等资源异常隔离单个 Pipeline 异常不影响其他 Pipeline 执行二、环境搭建2.1 基础环境要求软件 / 库版本要求作用Python≥3.8基础开发环境Scrapy≥2.6爬虫框架pymysql≥1.0.2Python 操作 MySQL 客户端pymongo≥4.3.3Python 操作 MongoDB 客户端pandas≥1.5.3生成 CSV 文件可选2.2 环境安装bash运行pip install scrapy2.6.2 pymysql1.0.2 pymongo4.3.3 pandas1.5.32.3 数据库环境准备1MySQL 环境安装 MySQL 并启动服务创建数据库与数据表sqlCREATE DATABASE IF NOT EXISTS scrapy_db DEFAULT CHARACTER SET utf8mb4; USE scrapy_db; CREATE TABLE IF NOT EXISTS zhihu_hot ( id INT AUTO_INCREMENT PRIMARY KEY, title VARCHAR(500) NOT NULL COMMENT 热榜标题, hot_score INT COMMENT 热度值, url VARCHAR(500) NOT NULL COMMENT 热榜链接, create_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT 创建时间, UNIQUE KEY uk_url (url) COMMENT URL 唯一索引避免重复 ) ENGINEInnoDB DEFAULT CHARSETutf8mb4;2MongoDB 环境安装 MongoDB 并启动服务无需提前创建集合MongoDB 自动创建。三、Pipeline 数据持久化实战开发3.1 创建基础爬虫项目bash运行# 创建项目 scrapy startproject zhihu_hot # 进入项目目录 cd zhihu_hot # 创建爬虫文件 scrapy genspider zhihu_hot_spider zhihu.com3.2 定义 Item 结构items.pypython运行import scrapy class ZhihuHotItem(scrapy.Item): # 热榜标题 title scrapy.Field() # 热度值 hot_score scrapy.Field() # 热榜链接 url scrapy.Field()3.3 开发爬虫文件zhihu_hot_spider.pypython运行import scrapy from zhihu_hot.items import ZhihuHotItem class ZhihuHotSpiderSpider(scrapy.Spider): name zhihu_hot_spider allowed_domains [zhihu.com] start_urls [https://www.zhihu.com/hot] def parse(self, response): 解析知乎热榜数据 # 定位热榜列表 hot_list response.xpath(//div[classHotItem-content]) for hot in hot_list: item ZhihuHotItem() # 提取标题 item[title] hot.xpath(.//h2[classHotItem-title]/a/text()).extract_first() # 提取热度值处理格式如 100万 转为 1000000 hot_score_str hot.xpath(.//div[classHotItem-metrics]/text()).extract_first() or 0 if 万 in hot_score_str: item[hot_score] int(float(hot_score_str.replace(万, ).replace(, )) * 10000) else: item[hot_score] int(hot_score_str.replace(, )) if hot_score_str.replace(, ).isdigit() else 0 # 提取链接拼接完整 URL relative_url hot.xpath(.//h2[classHotItem-title]/a/href).extract_first() item[url] fhttps://www.zhihu.com{relative_url} if relative_url else yield item3.4 实战 1基础文件存储JSON/CSV3.4.1 JSON 存储 Pipelinepipelines.pypython运行import json import os from scrapy.exceptions import DropItem class JsonPipeline: 将 Item 存储为 JSON 文件 def open_spider(self, spider): 爬虫启动时创建 JSON 文件 # 确保存储目录存在 if not os.path.exists(output): os.makedirs(output) # 打开文件句柄 self.file open(output/zhihu_hot.json, w, encodingutf-8) # 写入 JSON 数组开头 self.file.write([) self.first_item True def process_item(self, item, spider): 处理每个 Item写入文件 # 过滤空标题的 Item if not item.get(title): raise DropItem(f丢弃无效 Item标题为空 {item}) # 转换 Item 为字典 item_dict dict(item) # 处理第一个 Item 无需添加逗号 if not self.first_item: self.file.write(,) else: self.first_item False # 序列化并写入ensure_asciiFalse 保留中文 self.file.write(json.dumps(item_dict, ensure_asciiFalse, indent2)) spider.logger.info(fJSON 存储成功{item[title]}) return item def close_spider(self, spider): 爬虫关闭时关闭文件 # 写入 JSON 数组结尾 self.file.write(]) self.file.close() spider.logger.info(JSON 文件存储完成路径output/zhihu_hot.json)3.4.2 CSV 存储 Pipelinepipelines.pypython运行import pandas as pd import os from scrapy.exceptions import DropItem class CsvPipeline: 将 Item 存储为 CSV 文件 def open_spider(self, spider): 初始化数据列表 self.data_list [] if not os.path.exists(output): os.makedirs(output) def process_item(self, item, spider): 收集 Item 数据 if not item.get(url): raise DropItem(f丢弃无效 ItemURL 为空 {item}) self.data_list.append(dict(item)) spider.logger.info(fCSV 数据收集成功{item[title]}) return item def close_spider(self, spider): 将收集的数据写入 CSV 文件 if self.data_list: df pd.DataFrame(self.data_list) # 按热度值降序排列 df df.sort_values(byhot_score, ascendingFalse) df.to_csv(output/zhihu_hot.csv, indexFalse, encodingutf-8-sig) spider.logger.info(fCSV 文件存储完成共 {len(self.data_list)} 条数据路径output/zhihu_hot.csv) else: spider.logger.warning(无有效数据未生成 CSV 文件)3.4.3 启用文件存储 Pipelinesettings.pypython运行ITEM_PIPELINES { zhihu_hot.pipelines.JsonPipeline: 200, zhihu_hot.pipelines.CsvPipeline: 250, }3.4.4 输出结果与原理JSON 文件输出示例output/zhihu_hot.jsonjson[ { title: 为什么现在的年轻人越来越不愿意结婚了, hot_score: 1258000, url: https://www.zhihu.com/question/6328XXXX }, { title: 2025 年养老金调整方案公布哪些人受益最多, hot_score: 985000, url: https://www.zhihu.com/question/6329XXXX } ]CSV 文件输出示例output/zhihu_hot.csvtitlehot_scoreurl为什么现在的年轻人越来越不愿意结婚了1258000https://www.zhihu.com/question/6328XXXX2025 年养老金调整方案公布哪些人受益最多985000https://www.zhihu.com/question/6329XXXX核心原理open_spider初始化资源文件句柄、数据列表避免频繁创建 / 关闭文件process_item过滤无效 Item抛出DropItem异常有效 Item 写入文件或收集到列表close_spider释放资源关闭文件句柄并对 CSV 数据进行排序后写入ensure_asciiFalse和utf-8-sig确保中文正常显示。3.5 实战 2MySQL 数据库存储3.5.1 MySQL 存储 Pipelinepipelines.pypython运行import pymysql from scrapy.exceptions import DropItem from twisted.enterprise import adbapi # 异步数据库操作 class MysqlPipeline: 异步将 Item 存储至 MySQL 数据库 def __init__(self, db_pool): self.db_pool db_pool classmethod def from_crawler(cls, crawler): 从配置文件读取数据库参数创建数据库连接池 db_params { host: crawler.settings.get(MYSQL_HOST, 127.0.0.1), port: crawler.settings.get(MYSQL_PORT, 3306), user: crawler.settings.get(MYSQL_USER, root), password: crawler.settings.get(MYSQL_PASSWORD, 123456), db: crawler.settings.get(MYSQL_DB, scrapy_db), charset: utf8mb4, cursorclass: pymysql.cursors.DictCursor } # 创建异步数据库连接池 db_pool adbapi.ConnectionPool(pymysql, **db_params) return cls(db_pool) def process_item(self, item, spider): 异步执行插入操作 # 过滤热度值为 0 的 Item if item.get(hot_score, 0) 0: raise DropItem(f丢弃无效 Item热度值为 0 {item}) # 异步调用插入方法 query self.db_pool.runInteraction(self.insert_item, item) # 处理插入异常 query.addErrback(self.handle_error, item, spider) return item def insert_item(self, cursor, item): 执行 SQL 插入 sql INSERT INTO zhihu_hot (title, hot_score, url) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE title VALUES(title), hot_score VALUES(hot_score) cursor.execute(sql, (item[title], item[hot_score], item[url])) # ON DUPLICATE KEY UPDATE 实现重复 URL 时更新数据 def handle_error(self, failure, item, spider): 处理数据库操作异常 spider.logger.error(fMySQL 插入失败{failure}Item{item}) def close_spider(self, spider): 关闭数据库连接池 self.db_pool.close() spider.logger.info(MySQL 连接池已关闭)3.5.2 配置 MySQL 参数settings.pypython运行# MySQL 数据库配置 MYSQL_HOST 127.0.0.1 MYSQL_PORT 3306 MYSQL_USER root MYSQL_PASSWORD 123456 # 替换为你的 MySQL 密码 MYSQL_DB scrapy_db # 启用 MySQL Pipeline ITEM_PIPELINES { zhihu_hot.pipelines.JsonPipeline: 200, zhihu_hot.pipelines.CsvPipeline: 250, zhihu_hot.pipelines.MysqlPipeline: 300, }3.5.3 输出结果与原理MySQL 数据表查询结果sqlSELECT title, hot_score FROM zhihu_hot LIMIT 2;titlehot_score为什么现在的年轻人越来越不愿意结婚了12580002025 年养老金调整方案公布哪些人受益最多985000核心原理from_crawler从配置文件读取参数解耦配置与代码adbapi.ConnectionPool创建异步连接池避免同步操作阻塞爬虫runInteraction异步执行 SQL 插入提升爬虫效率ON DUPLICATE KEY UPDATE利用 URL 唯一索引实现数据去重重复 URL 时更新标题和热度值addErrback捕获数据库操作异常避免单个 Item 插入失败导致爬虫中断。3.6 实战 3MongoDB 文档存储3.6.1 MongoDB 存储 Pipelinepipelines.pypython运行import pymongo from scrapy.exceptions import DropItem class MongoDBPipeline: 将 Item 存储至 MongoDB 数据库 def __init__(self, mongo_uri, mongo_db): self.mongo_uri mongo_uri self.mongo_db mongo_db self.client None self.db None classmethod def from_crawler(cls, crawler): 从配置文件读取 MongoDB 参数 return cls( mongo_uricrawler.settings.get(MONGO_URI, mongodb://127.0.0.1:27017/), mongo_dbcrawler.settings.get(MONGO_DB, scrapy_db) ) def open_spider(self, spider): 创建 MongoDB 连接 self.client pymongo.MongoClient(self.mongo_uri) self.db self.client[self.mongo_db] # 创建索引确保 URL 唯一 self.db.zhihu_hot.create_index(url, uniqueTrue) spider.logger.info(MongoDB 连接成功索引创建完成) def process_item(self, item, spider): 插入 Item 至 MongoDB try: # 插入数据重复 URL 时更新 self.db.zhihu_hot.update_one( {url: item[url]}, {$set: dict(item)}, upsertTrue # 不存在则插入存在则更新 ) spider.logger.info(fMongoDB 存储成功{item[title]}) except pymongo.errors.DuplicateKeyError: spider.logger.warning(fMongoDB 重复数据{item[url]}) except Exception as e: spider.logger.error(fMongoDB 插入失败{e}Item{item}) raise DropItem(fMongoDB 插入失败丢弃 Item{item}) return item def close_spider(self, spider): 关闭 MongoDB 连接 self.client.close() spider.logger.info(MongoDB 连接已关闭)3.6.2 配置 MongoDB 参数settings.pypython运行# MongoDB 配置 MONGO_URI mongodb://127.0.0.1:27017/ MONGO_DB scrapy_db # 启用 MongoDB Pipeline ITEM_PIPELINES { zhihu_hot.pipelines.JsonPipeline: 200, zhihu_hot.pipelines.CsvPipeline: 250, zhihu_hot.pipelines.MysqlPipeline: 300, zhihu_hot.pipelines.MongoDBPipeline: 400, }3.6.3 输出结果与原理MongoDB 集合查询结果bash运行mongo use scrapy_db db.zhihu_hot.find().limit(2)json[ { _id: ObjectId(67658xxxxxx), title: 为什么现在的年轻人越来越不愿意结婚了, hot_score: 1258000, url: https://www.zhihu.com/question/6328XXXX }, { _id: ObjectId(67658xxxxxx), title: 2025 年养老金调整方案公布哪些人受益最多, hot_score: 985000, url: https://www.zhihu.com/question/6329XXXX } ]核心原理create_index(url, uniqueTrue)创建 URL 唯一索引避免重复数据update_one结合upsertTrue实现 “存在则更新不存在则插入”捕获DuplicateKeyError异常优雅处理重复数据避免爬虫中断MongoDB 无需提前定义表结构适配动态字段扩展如后续新增字段无需修改表结构。3.7 实战 4数据去重与校验 Pipeline进阶3.7.1 全局去重 Pipelinepipelines.pypython运行class DuplicateFilterPipeline: 基于 Redis 实现全局数据去重跨爬虫/跨节点 def __init__(self, redis_uri): import redis self.redis_client redis.Redis.from_url(redis_uri) self.redis_key zhihu_hot:urls # 存储已爬取 URL 的 Redis Key classmethod def from_crawler(cls, crawler): return cls(redis_uricrawler.settings.get(REDIS_URI, redis://127.0.0.1:6379/0)) def process_item(self, item, spider): url item.get(url) if not url: raise DropItem(fURL 为空丢弃 Item{item}) # Redis SET 实现去重 if self.redis_client.sismember(self.redis_key, url): raise DropItem(f全局重复 URL丢弃 Item{url}) else: self.redis_client.sadd(self.redis_key, url) spider.logger.info(f全局去重校验通过{url}) return item def close_spider(self, spider): self.redis_client.close()3.7.2 启用去重 Pipelinesettings.pypython运行# Redis 配置 REDIS_URI redis://127.0.0.1:6379/0 # 调整 Pipeline 优先级去重优先执行 ITEM_PIPELINES { zhihu_hot.pipelines.DuplicateFilterPipeline: 100, # 最高优先级 zhihu_hot.pipelines.JsonPipeline: 200, zhihu_hot.pipelines.CsvPipeline: 250, zhihu_hot.pipelines.MysqlPipeline: 300, zhihu_hot.pipelines.MongoDBPipeline: 400, }核心原理利用 Redis Set 的sismember方法判断 URL 是否已存在实现全局去重优先级 100 确保去重在校验、存储前执行减少无效数据处理跨节点 / 跨爬虫运行时Redis 可共享去重数据避免分布式环境下的重复爬取。四、Pipeline 性能调优与最佳实践4.1 性能调优策略优化点具体方案数据库操作优化使用异步连接池adbapi避免同步阻塞批量插入每 100 条批量提交文件操作优化减少文件写入次数先收集数据爬虫关闭时一次性写入去重优化优先使用内存 / Redis 去重再使用数据库唯一索引异常处理优化捕获特定异常避免通用异常导致所有 Item 处理失败4.2 最佳实践单一职责原则每个 Pipeline 只负责一个功能如校验、JSON 存储、MySQL 存储配置解耦通过from_crawler读取配置避免硬编码数据库地址、密码资源管理通过open_spider/close_spider统一管理连接 / 文件句柄避免资源泄露异常日志详细记录异常信息如 Item 内容、错误类型便于问题定位数据校验在前置 Pipeline 完成数据校验避免无效数据进入存储环节。五、常见问题与解决方案问题现象原因分析解决方案JSON 文件中文乱码未设置 ensure_asciiFalse序列化时添加ensure_asciiFalseCSV 文件中文乱码编码格式错误使用utf-8-sig编码写入 CSVMySQL 插入速度慢同步插入 / 单条插入使用 adbapi 异步插入批量提交 SQLMongoDB 重复插入未创建唯一索引为 URL 字段创建唯一索引使用 upsertTruePipeline 不执行未在 settings.py 中启用 / 优先级配置错误检查ITEM_PIPELINES配置确保数字优先级正确数据库连接超时数据库地址错误 / 防火墙拦截验证数据库地址开放数据库端口MySQL 3306、MongoDB 27017六、总结本文系统讲解了 Scrapy Pipeline 的数据持久化开发从核心原理出发实现了文件存储JSON/CSV、关系型数据库存储MySQL、非关系型数据库存储MongoDB三类核心方案并补充了全局数据去重、异常处理等进阶功能。Pipeline 作为 Scrapy 数据持久化的核心组件其模块化、可扩展的特性使其适配各类存储场景是企业级爬虫开发的必备技能。在实际开发中可根据业务需求扩展更多 Pipeline 功能如数据加密存储、数据同步至 Elasticsearch、数据推送至消息队列Kafka/RabbitMQ等。掌握 Pipeline 的开发规范与性能调优策略可大幅提升爬虫数据存储的稳定性、效率与可维护性。
版权声明:本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

同行抄袭公司网站如何设计一个企业

光的本质粒子流一、核心辐射机制(基础分类)这是光产生的两种最根本跃迁方式,其他发光机制本质上可归为这两类的延伸或具体应用。• 自发辐射:粒子在无外部刺激时,自发从高能级跳向低能级,随机释放光子。 特…

张小明 2026/1/5 13:24:54 网站建设

网站开发服务承诺书网站建设培训 ppt

Linly-Talker表情过渡平滑算法:让数字人更懂“渐入情绪” 在虚拟主播流畅讲解产品、AI客服温柔回应咨询的今天,你是否曾留意过它们的表情变化?那些从微笑到惊讶、从平静到关切的转换,是否自然得让你忘了对面是个程序?这…

张小明 2026/1/5 12:26:04 网站建设

网站ui怎么做的书法网站模板下载

电子课本下载工具:轻松获取PDF教材的终极解决方案 【免费下载链接】tchMaterial-parser 国家中小学智慧教育平台 电子课本下载工具 项目地址: https://gitcode.com/GitHub_Trending/tc/tchMaterial-parser 还在为找不到高质量的电子课本PDF而烦恼吗&#xff…

张小明 2026/1/7 15:23:19 网站建设

源码管理 网站app软件开发学什么专业

串口通信中的奇偶校验:从原理到实战的深度解析在嵌入式开发的世界里,UART(通用异步收发器)是最基础、也最常用的通信接口之一。无论是调试打印日志、连接传感器,还是与工业设备交互,我们几乎每天都在和串口…

张小明 2026/1/5 20:12:14 网站建设

适合大学生做的兼职网站有哪些wordpress最大图片

SDR信号调制解调原理:从零搞懂底层机制你有没有想过,为什么你的手机能同时支持4G、5G、Wi-Fi和蓝牙?为什么军用无线电能在战场上瞬间切换通信模式?答案就藏在软件定义无线电(Software-Defined Radio,简称SD…

张小明 2026/1/7 14:12:47 网站建设

网站建设需求范本免费做苗木的网站

第一章:Shell脚本的基本语法和命令Shell脚本是Linux/Unix系统中自动化任务的核心工具,它通过解释执行一系列命令来完成特定功能。编写Shell脚本时,通常以“shebang”开头,用于指定解释器路径。脚本的起始声明 所有Shell脚本应以如…

张小明 2026/1/5 20:12:09 网站建设