爬虫分析之数据存储——基于MySQL,Scrapy

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 上一篇->爬虫练习之数据整理——基于Pandas上上篇->爬虫练习之数据清洗——基于Pandas配置MySql关于MySQL在Ubuntu的Pycharm上的配置,可以参考这篇文章中的第三部分Mac安装mysql及终端操作mysql与pych...

上一篇->爬虫练习之数据整理——基于Pandas
上上篇->爬虫练习之数据清洗——基于Pandas

配置MySql

关于MySQL在Ubuntu的Pycharm上的配置,可以参考这篇文章中的第三部分

Mac安装mysql及终端操作mysql与pycharm的数据库可视化

如果上面的步骤处理完毕后找不到你新建的数据库, 可以参照下图配置

勾选要显示的Schemas(数据库集合)

数据存储需要用到pymysql模块, 在File->Settings中找到如图的设置页面,点击加号搜索pymysql并安装

如何存储

在开始考虑如何存储之前, 我们需要考虑一个问题, 数据存储应该是什么时候要做的事.
假设你已经了解过Scrapy框架, 下面是来自官网对item pipeline的典型应用

  • 清理数据
  • 验证爬取的数据(检查item包含某些字段)
  • 查重(并丢弃)
  • 将爬取结果保存到数据库中

另请参阅官方文档>Item Pipeline

我们要实现的数据存储, 先来试一试能否成功吧

# 你可以参考以下代码编写自己的pipeline
import pymysql

class jobCrawlerPipeline(object):
    def process_item(self, item, spider):
        '''
        将爬取的信息保存到mysql
        :param item:
        :param spider:
        :return: item
        '''
        # Get data from item
        job_name = item['job_name']
        company = item['company']
        address = item['address']
        salary = item['salary']
        time = item['time']

        # Connecting with local database, change the value if not the same
        db = pymysql.connect(
            host='localhost',
            user='root',
            passwd='1320',
            db='scrapyDB',
            charset='utf8',
            cursorclass=pymysql.cursors.DictCursor)
        try:
            # open the cursor
            cursor = db.cursor()
            sql = 'INSERT INTO tb_job(job_name,company,address,salary,time)' \
                  'VALUES ("%s", "%s", "%s", "%s", "%s")' % (job_name,company,address,salary,time)
            # execute the sql
            cursor.execute(sql)
            db.commit()
        finally:
            # close the connection
            db.close()
        return item

爬虫尚未结束, 但是通过终端, 我们知道该停下爬虫了.


爬取中...
存储在MySQL的信息

重新回到爬虫项目的思路

思考整个爬虫项目的流程, 应该是这样

抓取信息->清理信息->整理信息->存储信息->分析信息

数据整理

而上面的存储信息虽然已经成功了一部分,但是薪资信息仍需要整理,更重要的是爬取的信息中没有明确的id, 如何在后续中加入topSalary, bottomSalary 等整理后才有的信息与之对应呢?

重新审视Item Pipeline的典型应用, 我们能不能在Pipeline上实现整理,清理, 验证或是丢弃呢?

分析item中的项目, 整理和验证可能是最容易实现的部分
我们先把整理功能实现并验证是否成功, 在class jobCrawlerPipeline(object):中添加下面这个方法.用于把爬取下来的工资数据进行整理,关于这个方法的实现,请参考前一篇爬虫练习之数据整理——基于Pandas

    class jobCrawlerPipeline(object):

    def cut_word(self, word, method):
        if method == 'bottom':
            length = len(word)
            if (word.find('万') == -1):
                if (word.find('以下') != -1):
                    # XX千以下
                    postion = word.find('以下')
                    bottomSalary = str(word[:(postion - 5)])
                elif (word.find('以上') != -1):
                    postion = word.find('以上')
                    bottomSalary = str(float(word[:postion - 5]))
                else:
                    # XX千/月
                    postion = word.find('-')
                    bottomSalary = str(float(word[:(postion)]))
            else:
                if (word.find('年') == -1):
                    if (word.find('以下') != -1):
                        # XX万以下
                        postion = word.find('以下')
                        bottomSalary = str(float(word[:(postion - 5)]) * 10)
                    elif (word.find('以上') != -1):
                        # XX万以上
                        postion = word.find('以上')
                        bottomSalary = str(float(word[:postion - 5]) * 10)
                    elif (word.find('+') != -1):
                        # XX万+
                        postion = word.find('+')
                        bottomSalary = str(float(word[:(postion)]) * 10)
                    else:
                        # XX万/月
                        postion = word.find('-')
                        bottomSalary = str(float(word[:(postion)]) * 10)

                else:
                    if (word.find('以下') != -1):
                        # XX万以下/年
                        postion = word.find('以下')
                        bottomSalary = str(float(word[:(postion - 5)]) / 1.2)
                    elif (word.find('以上') != -1):
                        postion = word.find('以上')
                        bottomSalary = str(float(word[:postion - 5]) / 1.2)
                    elif (word.find('+') != -1):
                        # XX万+
                        postion = word.find('+')
                        bottomSalary = str(float(word[:(postion)]) / 1.2)
                    else:
                        # XX万/年
                        postion = word.find('-')
                        bottomSalary = word[:(postion)]
                        bottomSalary = str(float(bottomSalary) / 1.2)
            return bottomSalary

        if method == 'top':
            length = len(word)
            if (word.find('万') == -1):
                if (word.find('以下') != -1):
                    # XX千以下
                    postion = word.find('以下')
                    topSalary = str(float(word[:(postion - 5)]))
                elif (word.find('以上') != -1):
                    postion = word.find('以上')
                    topSalary = str(float(word[:postion - 5]))
                else:
                    # XX千/月
                    postion = word.find('-')
                    topSalary = str(float(word[(postion + 1):(length - 11)]))
            else:
                if (word.find('年') == -1):
                    if (word.find('以下') != -1):
                        # XX万以下
                        postion = word.find('以下')
                        topSalary = str(float(word[:(postion - 5)]) * 10)
                    elif (word.find('以上') != -1):
                        # XX万以上
                        postion = word.find('以上')
                        topSalary = str(float(word[:postion - 5]) * 10)
                    else:
                        # XX万/月
                        postion = word.find('-')
                        topSalary = str(float(word[(postion + 1):(length - 11)]) * 10)

                else:
                    if (word.find('以下') != -1):
                        # XX万以下/年
                        postion = word.find('以下')
                        topSalary = str(float(word[:(postion - 5)]) / 1.2)
                    elif (word.find('以上') != -1):
                        # XX万以上一年
                        postion = word.find('以上')
                        topSalary = str(float(word[:postion - 5]) / 1.2)
                    elif (word.find('+') != -1):
                        # XX万+
                        postion = word.find('+')
                        topSalary = str(float(word[:(postion)]) / 1.2)
                    else:
                        # XX万/年
                        postion = word.find('-')
                        topSalary = word[(postion + 1):(length - 11)]
                        topSalary = str(int(topSalary) / 1.2)
            return topSalary

如果你看了上面的代码, 你可能发现与前一篇有些许不同, 最主要的差别就是字符串数组切片的位置发生了改变.
为什么要改呢?

因为这是Python的编码坑啊

通过观察终端的输出,可以看到爬下来尚未存储的数据是以unicode的形式存在,这个时候是5个字节一个中文
因此看到下面截图中的salary,可以判断要得到薪资的底薪和顶薪,需要剔除掉11个字节

爬取数据中

数据清洗

至此,数据的基本处理已经合并到Pipeline中,鉴于可能还有脏数据在item中,我们在Pipeline的process_item方法中加入相应的代码
这段代码应当加在处理数据之前,减少一些系统开销

# Get data from item
        job_name = item['job_name']
        salary = item['salary']

        dirty_job_name = re.compile(r'(\*|在家|试用|体验|无需|无须|试玩|红包)+')
        dirty_salary = re.compile(r'(小时|天)+')

        # clean dirty data
        if(dirty_job_name.search(str(job_name))):
            raise DropItem("Dirty data %s" % item)
        if(dirty_salary.search(str(salary))):
            raise DropItem("Dirty data %s" % item)
        if(salary == None):
            raise DropItem("Dirty data %s" % item)

数据存储

把清洗并整理完毕的数据进行数据存储

建立数据库的相关MySql语句是

CREATE DATABASE IF NOT EXISTS scrapyDB DEFAULT CHARACTER SET utf8;

CREATE TABLE IF NOT EXISTS `tb_job`(
  `job_id` bigint NOT NULL AUTO_INCREMENT,
  `job_name` varchar(50) NOT NULL,
  `company` varchar(50) NOT NULL,
  `address` varchar(50) NOT NULL,
  `bottom_salary` varchar(10) NOT NULL,
  `top_salary` varchar(10) NOT NULL,
  `salary` varchar(15) NOT NULL,
  `time` varchar(10) NOT NULL,
  PRIMARY KEY (`job_id`),
  UNIQUE KEY `unique_info`(`job_name`, `company`, `address`)
  );

这里实现的思路不止一种

Solution 1 在process_item中直接将处理完的item保存到数据库中

实际测试的时候发现保存下来的数据除了job_name字段外, 其他中文字段全部变成Unicode码, 原因不明. 大家如果成功用这种方法实现了, 不妨在留言区告知一下, 毕竟第二种方法多了文件IO的开销, 耗时会比较大

Solution 2 在爬取结束之后再进行数据库写入操作

爬取结束后, 用pandas模块的csv读取函数打开爬取完毕的csv文件, 写入数据库

Attention!

以上两种方法的commit()建议在全部插入后一次commit完成
必须在close_spider方法中关闭数据库
若使用第一种方法, 建议在open_spider中实现数据库初始化工作, 而不是每执行一次process_item进行一次打开关闭数据库

写入数据库

参考代码

# Function1
def open_spider(self, spider):
        self.conn = pymysql.connect(
            host='localhost',
            user='root',
            passwd='mysql',
            db='scrapyDB',
            charset='utf8',
            cursorclass=pymysql.cursors.DictCursor)

def close_spider(self, spider):
        try:
            # open the cursor
            self.cursor = self.conn.cursor()

            # get data from csv file
            # reload data
            f = open(r'job.csv', 'r')
            f.close()
            job_info = pandas.read_csv(r'job.csv', iterator=True,chunksize=1,
                                       header=None,names=
                                       ['job_name','company','address','bottom_salary','top_salary','salary','time'])

            # store data
            for i, job in enumerate (job_info):
                # use -1 or ' ' to fill NAN
                job = job.fillna({'job_name':'','company':'','address':'','time':''})
                job = job.fillna(-1)
                # transform series to list type
                job = job.values[0]

                sql = 'INSERT INTO tb_job(job_name,company,address,bottom_salary,top_salary,salary,time)' \
                      'VALUES ("%s", "%s", "%s", "%s", "%s", "%s", "%s")' % (
                      job[6], job[2], job[3], job[1], job[5], job[0], job[6])
                self.cursor.execute(sql)
            self.conn.commit()

        finally:
            # close the connection
            self.conn.close()

# Function2 
# 未将打开关闭数据库拆分出来, 请自行修改
db = pymysql.connect(
            host='localhost',
            user='root',
            passwd='mysql',
            db='scrapyDB',
            charset='utf8',
            cursorclass=pymysql.cursors.DictCursor)
        try:
            # open the cursor
            cursor = db.cursor()
            sql = 'INSERT INTO tb_job(job_name,company,address,bottom_salary,top_salary,salary,time)' \
                  'VALUES ("%s", "%s", "%s", "%s", "%s", "%s", "%s")' % (job_name,item['company'],item['address'],item['bottomSalary'],item['topSalary'],item['salary'],item['time'])
            # execute the sql
            cursor.execute(sql)
            db.commit()
        finally:
            # close the connection
            db.close()

最终的数据库代码中, 暂时删除了unique_info索引, 原因是当前只需要尚不需要进行增量爬取. 使用unique_info索引后, 如果遇到重复的数据将直接RollBack, 而我们是在最后才一次性commit的, 这样肯定不行
就需要增加开销去每插入一条数据提交一次
后续将对这个问题进行处理, 敬请期待

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
2月前
|
数据采集 缓存 定位技术
网络延迟对Python爬虫速度的影响分析
网络延迟对Python爬虫速度的影响分析
|
2月前
|
数据采集 存储 JSON
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第27天】本文介绍了Python网络爬虫Scrapy框架的实战应用与技巧。首先讲解了如何创建Scrapy项目、定义爬虫、处理JSON响应、设置User-Agent和代理,以及存储爬取的数据。通过具体示例,帮助读者掌握Scrapy的核心功能和使用方法,提升数据采集效率。
149 6
|
19天前
|
SQL 关系型数据库 MySQL
MySQL事务日志-Undo Log工作原理分析
事务的持久性是交由Redo Log来保证,原子性则是交由Undo Log来保证。如果事务中的SQL执行到一半出现错误,需要把前面已经执行过的SQL撤销以达到原子性的目的,这个过程也叫做"回滚",所以Undo Log也叫回滚日志。
MySQL事务日志-Undo Log工作原理分析
|
2天前
|
关系型数据库 MySQL 数据库
mysql慢查询每日汇报与分析
通过启用慢查询日志、提取和分析慢查询日志,可以有效识别和优化数据库中的性能瓶颈。结合适当的自动化工具和优化措施,可以显著提高MySQL数据库的性能和稳定性。希望本文的详解和示例能够为数据库管理人员提供有价值的参考,帮助实现高效的数据库管理。
27 11
|
1月前
|
SQL 关系型数据库 MySQL
MySQL 窗口函数详解:分析性查询的强大工具
MySQL 窗口函数从 8.0 版本开始支持,提供了一种灵活的方式处理 SQL 查询中的数据。无需分组即可对行集进行分析,常用于计算排名、累计和、移动平均值等。基本语法包括 `function_name([arguments]) OVER ([PARTITION BY columns] [ORDER BY columns] [frame_clause])`,常见函数有 `ROW_NUMBER()`, `RANK()`, `DENSE_RANK()`, `SUM()`, `AVG()` 等。窗口框架定义了计算聚合值时应包含的行。适用于复杂数据操作和分析报告。
79 11
|
3月前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1788 14
MySQL事务日志-Redo Log工作原理分析
|
2月前
|
数据采集 存储 JSON
Python爬虫开发中的分析与方案制定
Python爬虫开发中的分析与方案制定
|
2月前
|
数据采集 前端开发 中间件
Python网络爬虫:Scrapy框架的实战应用与技巧分享
【10月更文挑战第26天】Python是一种强大的编程语言,在数据抓取和网络爬虫领域应用广泛。Scrapy作为高效灵活的爬虫框架,为开发者提供了强大的工具集。本文通过实战案例,详细解析Scrapy框架的应用与技巧,并附上示例代码。文章介绍了Scrapy的基本概念、创建项目、编写简单爬虫、高级特性和技巧等内容。
124 4
|
2月前
|
数据采集 中间件 API
在Scrapy爬虫中应用Crawlera进行反爬虫策略
在Scrapy爬虫中应用Crawlera进行反爬虫策略
|
3月前
|
存储 关系型数据库 MySQL
基于案例分析 MySQL 权限认证中的具体优先原则
【10月更文挑战第26天】本文通过具体案例分析了MySQL权限认证中的优先原则,包括全局权限、数据库级别权限和表级别权限的设置与优先级。全局权限优先于数据库级别权限,后者又优先于表级别权限。在权限冲突时,更严格的权限将被优先执行,确保数据库的安全性与资源合理分配。