V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
mactec
V2EX  ›  Python

tornado 拿到 gen.return 异步返回的结果后,没有在 yield 的地方恢复继续执行

  •  
  •   mactec · 2017-08-29 13:04:31 +08:00 · 2699 次点击
    这是一个创建于 2670 天前的主题,其中的信息可能已经有所发展或是发生改变。

    请教大伙一个问题,我调试了半天,发现 gen.return 返回异步结果后,程序不是接着 yield 的地方执行,而是又跳到 ioloop,tornado 初学,文档较少,希望大家能帮忙指出错误在哪,感激 部分程序如下


    @gen.coroutine
    def _fetch_and_extract(self, task):
        self.processing_task_number += 1
        self.processing_task_set.add(task)
        if self.processing_task_number != self.processing_task_set.size():ioloop.IOLoop.instance().stop()
        self.logger.info("%s: start to fetch and extract" % self.worker_name)
        if check_task(task):
            try:
                spider = load_object(task["spider"])
            except Exception, e:
                handle_fail_task(task,"load %s object failed" % (task["spider"]),self.process_fail_task_queue, self.wait_for_process_task_queue)
                self.logger.error("%s: load object failed.path:%s, exception:%s" % (self.worker_name, task["spider"], e))
            else:
                spider_object = spider(self.processed_url_set, self.wait_for_process_task_queue)
                fetch_start_time = datetime.datetime.now()
                resp = yield spider_object.fetch(task["request"])
                self.logger.debug("got resp already ”)#已打印
    	    #这里已经拿到异步返回的 resp 了,但是代码没有恢复接着往下走,而是直接回到 loop 去取 task 了,但是这时候 task 里空了,所以之后就一直提示任务空
    	    #相当于只抓了美团 api 的 city 列表,后面 extract、再添加 task 都没有执行,我 debug 时候比较奇怪这点,按道理异步返回 resp 了应该接着之前代码的位置继续执行 不是么
                if resp == None or resp.error != None:
                    handle_fail_task(task, "fetch request: %s failed,code:%d error:%s" % (task["request"], resp.code, resp.error), self.process_fail_task_queue, self.wait_for_process_task_queue)
                    self.logger.error("%s: fetch request: %s failed, error: %s" % (self.worker_name, task["request"], resp))
                else:
                    content_length = resp.headers['Content-Length'] if resp.headers.has_key('Content-Length') else None
                    last_modified = resp.headers['Last-Modified'] if resp.headers.has_key("Last-Modified") else None
                    fetch_time = datetime.datetime.now() - fetch_start_time
                    extract_start_time = datetime.datetime.now()
                    status = yield spider_object.extract(resp, **dict(task["kwargs"])
    
    3 条回复    2017-09-02 17:13:01 +08:00
    mactec
        1
    mactec  
    OP
       2017-08-29 13:08:04 +08:00
    mark 下,如果有答案了再来更新
    bluesky139
        2
    bluesky139  
       2017-08-30 08:18:03 +08:00
    肉眼表示没看出什么问题,你在那句打印出来 log 的地方以下每隔一行打 log 出来看看呢,或者能否精简个可独立运行的简陋版本出来
    mactec
        3
    mactec  
    OP
       2017-09-02 17:13:01 +08:00
    @bluesky139
    '''
    class Worker(object):
    def __init__(self,str):
    self._name = str
    self._installed = False
    def install(self):
    if self._installed:
    print "%s: no need to start again.worker has been installed!" % self._name
    else:
    self._installed = True
    ioloop.IOLoop.instance().add_callback(self.test)
    print "%s: worker is installed" % self._name
    @gen.coroutine
    def fetch(self,url):
    req = HTTPRequest(url,connect_timeout=3,request_timeout=5)
    client = httpclient.AsyncHTTPClient()
    resp = yield gen.Task(client.fetch, req)
    raise gen.Return(resp)

    @gen.coroutine
    def exact(self,resp):
    yield gen.sleep(5)
    raise gen.Return(10)

    @gen.coroutine
    def test(self):
    if self._installed:
    ioloop.IOLoop.instance().add_timeout(datetime.timedelta(microseconds=5000),self.test)
    str = self._name
    task_url = u'http://api.caipiao.163.com/missNumber_trend.html?gameEn=kuai3'
    resp = yield self.fetch(task_url)
    print "%s get resp already at %s" %(str,datetime.datetime.now())
    staus = yield self.exact(resp)
    print "callback!!!%s status returned %d at %s" %(str,staus,datetime.datetime.now())


    if __name__ == '__main__':
    arrs = ['aaa','bbb']
    for arr in arrs:
    worker = Worker(arr)
    worker.install()
    ioloop.IOLoop.instance().start()

    '''

    谢谢,还在找原因
    请问 ioloop.IOLoop.instance().add_timeout(datetime.timedelta(microseconds=5000),self.test)
    这个能让 test 方法定期执行么,测试结果这个间隔没有用
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3585 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 05:05 · PVG 13:05 · LAX 21:05 · JFK 00:05
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.