V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
a65420321a
V2EX  ›  Python

多线程爬虫写入 mysql 特别的慢

  •  
  •   a65420321a · 2018-10-30 16:00:24 +08:00 · 4189 次点击
    这是一个创建于 2211 天前的主题,其中的信息可能已经有所发展或是发生改变。
    30 条回复    2018-11-10 01:07:58 +08:00
    liuxu
        1
    liuxu  
       2018-10-30 16:01:13 +08:00   ❤️ 2
    那你让它快点写
    gaius
        2
    gaius  
       2018-10-30 16:03:29 +08:00
    磁盘不行?
    jinue9900
        3
    jinue9900  
       2018-10-30 16:04:04 +08:00
    用 redis 缓存一层再持久(滑稽)
    a65420321a
        4
    a65420321a  
    OP
       2018-10-30 16:05:32 +08:00
    #!/usr/local/bin/python2.7
    # -*- coding:utf-8 -*-
    import re, time, random, hashlib, urllib, requests, os, math, json, sys, base64, torndb, uuid, threading, sys, itertools, copy, traceback
    import requests;requests.packages.urllib3.disable_warnings()
    from common import *
    from config import *

    mkdir('jobs')

    class MyWorker(Worker):
    table_coms = 'zp_coms'
    table_jobs = 'zp_jobs'
    htmlfolder = 'jobs'
    def crawlTask(self):
    try:
    url = self.getAddr() # 获取 url
    htm = self.getHtml(url, None, 0)#获取 html
    #getJob 解析 html 获取职位数据
    self.record.update(self.getJob(htm))
    self.record['flag'] = 10
    except:
    self.record['flag'] = 99
    print traceback.format_exc()
    finally:
    #向每条职位数据中添加字段
    self.record['job_link_href'] = url
    self.record['company_type']=''
    self.record['company_tel']=''
    self.record['company_email']=''
    self.record['job_type']=''
    self.record['job_type_code']=''
    self.record['company_fax']=''
    #本次任务处理完成后更新数据库
    self.update(self.table_jobs, self.record)
    def getJob(self, job):
    self.record['job_title'] = grep(u'<h1>(.+?)</h1>', job)
    self.record['job_salary'] = grep(u'<span class="red">(.+?)</span>', job)
    self.record['job_date'] = grep(u'<span.*?>发布于(.+?)</span>', job)
    infos = grep(u'<div class="info-primary">.+?<p>(.+?)</p>', job, re.S).replace('<em class="vline"></em>', '|').split('|')
    for info in infos:
    if u'城市' in info:
    self.record['city_text'] = info[3:]
    self.record['job_city_code'] = info[3:]
    break
    cmp_infos = grep(u'h3 class="name".+?p>(.+?)</p>',job,re.S).replace('<em class="vline"></em>', '|').split('|')
    for info in cmp_infos:
    info = re.sub('<.+?>', '',info)
    if info in paramx['s'].values():
    self.record['company_size'] = info
    continue
    if info in paramx['i'].values():
    self.record['job_industry_code'] = info
    self.record['company_industry'] = info
    continue
    self.record['company_linkman'] = re.sub('<.*?>', '',grep(u'<h2 class="name">(.+?)</h2>',job,re.S)).strip()
    self.record['address'] = grep(u'div class="location-address">(.+?)</div>',job,re.S)
    self.record['company_link_url'] = 'https://www.zhipin.com'+grep(u'ka="job-detail-company" href="(.+?)"', job,re.S)
    self.record['company_name'] = grep(u'<h3 class="name".+?>(.+?)<.*?/h3>', job)
    self.record['cmp_company_id']=grep(u'"job-detail-company" href="/gongsi/(.+?).html"', job)
    def checkHtml(self, html):#用于检查页面是否有数据需要解析
    if html.find(u'<title>BOSS 直聘验证码</title>' )>=0:return 0
    if html.find(u'您暂时无法继续访问~' )>=0:return 0
    return 1
    def reqHtml(self, addr, data=None):#获取 response
    headers = {
    'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
    'accept-encoding': 'gzip, deflate, br',
    'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8',
    'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
    }
    return requests.get(addr, timeout=5, proxies=random.choice(proxies), headers=headers).content.decode('utf8', errors='ignore')
    def getAddr(self):#构建 url
    return 'https://www.zhipin.com/job_detail/%(cid)s.html?' % self.record

    class MyLeader(Leader):
    table_coms = 'zp_coms'
    table_jobs = 'zp_jobs'
    def getTaskFromDatabase(self):#从数据库中获取一万条任务
    return self.dbconn.query("SELECT `id`,`flag`,`cid` FROM `%s` WHERE `flag` IN (0) LIMIT 10000;" % self.table_jobs)
    if __name__=='__main__':
    dbconn = torndb.Connection(**dbconf)
    MyLeader().runWork(MyWorker,1)

    #Leader 类 runWork()方法为开启 MyWorker 多线程的方法,数字参数为指定线程数量
    #getTaskFromDatabase()从数据库取出一万条数据(cid 用于职位链接,flag 用于标识抓取进度,id 用于更新数据库)
    #MyWorker 为多线程类,run 方法中,self.leader.getTask()方法循环取出一条任务绑定 self.record,然后 crawlTask 函数处理任务,处理完成后 self.leader.popTask(self.record)删除此任务
    #crawlTask()函数内部解析任务获取职位数据,并更新 self.record,最后 update()方法存入数据库
    a65420321a
        5
    a65420321a  
    OP
       2018-10-30 16:06:55 +08:00
    妈耶~代码插进去好乱~
    bantao
        6
    bantao  
       2018-10-30 16:11:05 +08:00
    加缓存,批量写。
    jason94
        7
    jason94  
       2018-10-30 16:13:55 +08:00
    一般瓶颈在硬盘,用 redis 缓存一下
    xyjincan
        8
    xyjincan  
       2018-10-30 16:26:31 +08:00   ❤️ 1
    妈呀,爬虫速度比数据库还慢,大哥,你给存数据库提取出来一个服务,在内存排队多线程提交,前台脚本只负责爬数据,转交给数据库保存服务就行了,流水线才好提速
    a65420321a
        9
    a65420321a  
    OP
       2018-10-30 16:37:29 +08:00
    @xyjincan 原本是这么做的,被数据库保存搞了好几天搞得有些懵,原来一个爬一个存,但是爬的快存的慢跑步了多久整个进程就崩了。。然后我给改成了爬完就存多开些线程,进程倒是不会崩了,整体速度更慢了
    itskingname
        10
    itskingname  
       2018-10-30 16:44:30 +08:00   ❤️ 1
    爬虫写数据不要直接写 MySQL,速度慢,并发高还会出现锁错误。建议用 MongoDB,会快非常多。

    请关注我的爬虫书: https://www.v2ex.com/t/493016#reply615
    realpg
        11
    realpg  
       2018-10-30 16:44:40 +08:00
    你的数据库 IO 扛不住了还是锁的原因?
    自己性能分析一下 再找对应解决方案
    neoblackcap
        12
    neoblackcap  
       2018-10-30 16:47:03 +08:00
    首先请确认你的 Mysql 适配器支不支持多线程,据我了解,多数适配都不是线程安全的,多线程写会出现问题。
    其二,最好还是先汇总,然后由一个单一 worker 写入,因为写入压力全在磁盘,多线程写入不会提高性能。
    xyjincan
        13
    xyjincan  
       2018-10-30 16:49:25 +08:00
    @a65420321a 你可以在提交任务列表上加一个长度控制,如果内存快满了,就让服务调用方堵塞一会,先暂停爬取
    whypool
        14
    whypool  
       2018-10-30 16:57:07 +08:00
    批量 insert 效率高

    爬的数据先放内存,然后走批量
    luozic
        15
    luozic  
       2018-10-30 17:04:49 +08:00 via iPhone
    爬虫数据刚拉回来又没清洗存关系数据库做啥? 直接上 nosql
    jjianwen68
        16
    jjianwen68  
       2018-10-30 17:04:58 +08:00
    缓存爬到的数据,批量写入,缓存太大时暂停爬取
    a65420321a
        17
    a65420321a  
    OP
       2018-10-30 17:25:25 +08:00
    @luozic 我也很想。。。甲方爸爸要 mysql
    lyhiving
        18
    lyhiving  
       2018-10-30 17:28:42 +08:00
    排队,队列来弄。
    huaerxiela
        19
    huaerxiela  
       2018-10-30 17:30:53 +08:00
    twisted 异步写
    Itoktsnhc
        20
    Itoktsnhc  
       2018-10-30 17:42:42 +08:00
    加队列嘛,然后用时间间隔和待入库数据量做批量插入。
    luozic
        21
    luozic  
       2018-10-30 19:53:10 +08:00 via iPhone
    先写 nosql 后面再同步到 mysql
    a65420321a
        22
    a65420321a  
    OP
       2018-10-31 10:40:15 +08:00
    我。。。。换了个数据库,重构了一下表结构,速度上去了
    CEBBCAT
        23
    CEBBCAT  
       2018-11-01 14:48:37 +08:00 via Android
    @a65420321a 可以详细说说吗?
    CEBBCAT
        24
    CEBBCAT  
       2018-11-01 14:55:53 +08:00 via Android
    @xyjincan 请教一下,服务器中跑的服务很多,瞬息万变,可能一转眼系统就开始杀进程了,该如何把握时机把数据存入数据库呢?
    CEBBCAT
        25
    CEBBCAT  
       2018-11-01 15:12:35 +08:00 via Android
    @a65420321a #4 请问可以把源码贴一份到 gist 吗?想学习一下,多谢
    wersonliu9527
        26
    wersonliu9527  
       2018-11-02 09:58:07 +08:00
    对于中小量数据,直接用 pandas
    pd.Dataframe([]) 暂存内存后直接 to_sql,大量数据 用 scrapy+mysql/mongodb
    a65420321a
        27
    a65420321a  
    OP
       2018-11-02 11:43:40 +08:00   ❤️ 1
    @CEBBCAT 测试数据库没有配置好,换到了正式库上面,重新建了个表,索引主键唯一值什么的定义好,代码原封不动跑一遍,速度上来了。
    虽然还是很慢。。。。。
    a65420321a
        28
    a65420321a  
    OP
       2018-11-02 12:24:35 +08:00
    @wersonliu9527 140 万条数据,大小估摸在 600M 左右,试过 pandas,114M 数据导入 mysql 的时候会卡死(试了 3 次,每次都卡一个小时没反应,数据库也没变化)。
    xyjincan
        29
    xyjincan  
       2018-11-09 21:42:15 +08:00
    @CEBBCAT 杀进程,,,你给你的程序注册一个 kill 事件响应?
    CEBBCAT
        30
    CEBBCAT  
       2018-11-10 01:07:58 +08:00 via Android
    @xyjincan 对哦!😮
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   5609 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 08:22 · PVG 16:22 · LAX 00:22 · JFK 03:22
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.