V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
zhijiansha
V2EX  ›  Python

尝试用 aiohttp 写爬虫,但这么写不知道该怎么停止循环?

  •  
  •   zhijiansha · 2018-10-25 20:31:07 +08:00 · 2759 次点击
    这是一个创建于 2246 天前的主题,其中的信息可能已经有所发展或是发生改变。

    使用 aiohttp 试着写了一个爬虫,但是发现可能会出现 一级页面还在抓取的时候,由于队列为空,直接退出的情况。不知该如何去做这个判断?另外不知以下代码这么写是否有其他的问题??

    # coding:utf-8
    
    import asyncio
    
    import aiohttp
    
    
    class Task(object):
        def __init__(self, info, priority):
            self.priority = priority
            self.info = info
    
        def __lt__(self, other):
            return self.priority < other.priority
    
    
    class Spider(object):
    
        def __init__(self, loop=None):
            self.loop = loop
            conn = aiohttp.TCPConnector(limit=3)
            self.session = aiohttp.ClientSession(loop=loop, connector=conn)
            self.queue = asyncio.PriorityQueue()
    
        def start(self, page):
            task_info = {'callback': self.parse_1, 'page': page}
            return task_info
    
        async def set_item(self, task):
            pass
    
        async def fetch(self, task):
            await asyncio.sleep(2)
            task['callback'](task['page'])
    
        async def worker(self):
            while True:
                next_task = await self.queue.get()
                if next_task.info.get('type') == 'item':
                    asyncio.ensure_future(self.set_item(next_task.info))
                else:
                    asyncio.ensure_future(self.fetch(next_task.info))
                self.queue.task_done()
    
                # if self.queue.empty():
                #     await asyncio.sleep(1)
                #     if self.queue.empty():
                #         break
    
        def run(self):
            for page in range(1, 10):
                self.queue.put_nowait(Task(self.start(page), 0))
    
            self.loop.run_until_complete(self.worker())
    
        def close(self):
            if not self.session.closed:
                if self.session._connector_owner:
                    self.session._connector.close()
                self.session._connector = None
    
        def parse_1(self, meta):
            print('parse_1-----', meta)
            for page in range(20, 30):
                task = {'callback': self.parse_2, 'page': page}
                self.queue.put_nowait(Task(task, 1))
    
        def parse_2(self, meta):
            print('parse2----', meta)
            for page in range(30, 40):
                task = {'callback': self.parse_3, 'page': page}
                self.queue.put_nowait(Task(task, 0))
    
        def parse_3(self, meta):
            print('parse3----', meta)
    
    
    loop = asyncio.get_event_loop()
    sp = Spider(loop=loop)
    sp.run()
    sp.close()
    
    
    16 条回复    2018-11-14 16:06:23 +08:00
    jimmyczm
        1
    jimmyczm  
       2018-10-25 20:37:22 +08:00
    我看别人写的是先把队列放到 redis 里,对 redis 进行判断从而进行开始或终止
    zhijiansha
        2
    zhijiansha  
    OP
       2018-10-25 21:07:43 +08:00
    @jimmyczm 感觉用 redis 应该也会有这个问题。。。除非是标记一下任务状态
    AlisaDestiny
        3
    AlisaDestiny  
       2018-10-25 23:45:37 +08:00
    woker 函数里别 while True;弄一个标记,如果为 True 继续,为 False 就停止,
    zhijiansha
        4
    zhijiansha  
    OP
       2018-10-26 00:03:33 +08:00 via iPhone
    @AlisaDestiny 但爬取何时完成,无法预料,也就无法给设置为 False 啊
    binux
        5
    binux  
       2018-10-26 00:51:01 +08:00
    加个 work in progress 标记
    so1n
        6
    so1n  
       2018-10-26 01:03:02 +08:00 via Android
    手机看的好难受,没看完,你试试 work 那里使用 wait_for 替换 ensure_future
    Yourshell
        7
    Yourshell  
       2018-10-26 09:02:40 +08:00 via iPhone
    判断任务队列用 join 而不是 empty
    zhijiansha
        8
    zhijiansha  
    OP
       2018-10-26 10:22:17 +08:00
    @binux 请教一下这个标记的动作应该放在哪里执行??
    @so1n 替换后无法执行。。。

    @Yourshell 呃,判断队列为空不是 empty 么?
    Yourshell
        9
    Yourshell  
       2018-10-26 10:35:53 +08:00 via iPhone
    @zhijiansha empty 是判断队列是否为空,join 是阻塞至所有任务完成,也就是调用 task_done。你用 empty 判断队列为空,只是所有的任务都被 get 了,不代表已经完成了。你可以看看官方的例子 https://docs.python.org/3/library/asyncio-queue.html
    binux
        10
    binux  
       2018-10-26 11:08:09 +08:00
    @zhijiansha #8 任何 worker 没处理完之前都是 WIP 啊,除了要判断队列是否为空,还要判断是否有任务是 WIP
    zhijiansha
        11
    zhijiansha  
    OP
       2018-10-26 23:05:16 +08:00
    @Yourshell #9 谢回复!代码中是在 while 中从队列获取任务直接注册到事件循环,然后就执行了 task_done。这么如果用 join 去判断的话应该也是一样的,我尝试用链接中的例子去改写上面的爬虫代码,但是好像也是行不通,会一直阻塞。不知何故。

    @binux #10 谢回复!判断任务状态,在官方文档中找到了 all_tasks 和 current_task 这两个方法,但是好像不好使,即使任务全部完成也不为 None,导致判断失败。。
    zhijiansha
        12
    zhijiansha  
    OP
       2018-10-29 11:37:33 +08:00
    ```
    async def worker(self):
    """
    任务调度
    :return:
    """
    while True:
    if not self.queue.empty():
    next_data = await self.queue.get()
    task_id = uuid.uuid1()
    self.task_running_list.append(task_id)
    if isinstance(next_data, Item):
    asyncio.create_task(self.set_item(task_id, next_data.info))
    else:
    asyncio.create_task(self.fetch(task_id, next_data.info))
    self.queue.task_done()
    else:
    await asyncio.sleep(0)
    if self.queue.empty() and not self.task_running_list:
    break
    ```
    根据 @binux 的提示,这么处理可解决该问题。
    binux
        13
    binux  
       2018-10-29 11:41:02 +08:00
    @zhijiansha #12 处理完要把 task_id 删掉啊
    zhijiansha
        14
    zhijiansha  
    OP
       2018-10-29 12:32:55 +08:00 via iPhone
    @binux 嗯嗯,删除操作是在 fetch 方法里面执行的
    a65420321a
        15
    a65420321a  
       2018-10-30 16:11:57 +08:00
    怎么贴的代码?
    Harlaus
        16
    Harlaus  
       2018-11-14 16:06:23 +08:00
    建议 aio+mq
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1051 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 28ms · UTC 22:41 · PVG 06:41 · LAX 14:41 · JFK 17:41
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.