V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
simple221
V2EX  ›  Python

Python 多线程+队列的问题

  •  
  •   simple221 · 2017-02-03 16:24:26 +08:00 · 1835 次点击
    这是一个创建于 2897 天前的主题,其中的信息可能已经有所发展或是发生改变。
    # -*- coding: UTF-8 -*-
    import re
    import sys
    import threading
    import traceback
    import random
    import time
    import subprocess
    import shlex
    try:
        import Queue            # Python 2
    except ImportError:
        import queue as Queue   # Python 3
    class NoResultsPending(Exception):
        pass
    
    class NoWorkersAvailable(Exception):
        pass
    def _handle_thread_exception(request, exc_info):
        traceback.print_exception(*exc_info)
    def makeRequests(callable_, args_list, callback=None,
            exc_callback=_handle_thread_exception):
        requests = []
        for item in args_list:
            if isinstance(item, tuple):
                requests.append(
                    WorkRequest(callable_, item[0], item[1], callback=callback,
                        exc_callback=exc_callback)
                )
            else:
                requests.append(
                    WorkRequest(callable_, [item], None, callback=callback,
                        exc_callback=exc_callback)
                )
        return requests
    class WorkerThread(threading.Thread):
        def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
            threading.Thread.__init__(self, **kwds)
            self.setDaemon(1)
            self._requests_queue = requests_queue
            self._results_queue = results_queue
            self._poll_timeout = poll_timeout
            self._dismissed = threading.Event()
            self.start()
        def run(self):
            while True:
                if self._dismissed.isSet():
                    # we are dismissed, break out of loop
                    break
                try:
                    request = self._requests_queue.get(True, self._poll_timeout)
                except Queue.Empty:
                    continue
                except Exception as e:
                    pass
                else:
                    if self._dismissed.isSet():
                        self._requests_queue.put(request)
                        break
                    try:
                        result = request.callable(*request.args, **request.kwds)
                        self._results_queue.put((request, result))
                    except:
                        request.exception = True
                        self._results_queue.put((request, sys.exc_info()))
        def dismiss(self):
            self._dismissed.set()
    
    class WorkRequest:
        def __init__(self, callable_, args=None, kwds=None, requestID=None,
                callback=None, exc_callback=_handle_thread_exception):
            if requestID is None:
                self.requestID = id(self)
            else:
                try:
                    self.requestID = hash(requestID)
                except TypeError:
                    raise TypeError("requestID must be hashable.")
            self.exception = False
            self.callback = callback
            self.exc_callback = exc_callback
            self.callable = callable_
            self.args = args or []
            self.kwds = kwds or {}
    
        def __str__(self):
            return "<WorkRequest funname=%s id=%s args=%s kwargs=%s exception=%s>" % \
                (self.callable,self.requestID, self.args, self.kwds, self.exception)
    
    class ThreadPool:
        """A thread pool, distributing work requests and collecting results.
    
        See the module docstring for more information.
    
        """
    
        def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
            self._requests_queue = Queue.Queue(q_size)
            self._results_queue = Queue.Queue(resq_size)
            self.workers = []
            self.dismissedWorkers = []
            self.workRequests = {}
            self.createWorkers(num_workers, poll_timeout)
    
        def createWorkers(self, num_workers, poll_timeout=5):
            for i in range(num_workers):
                self.workers.append(WorkerThread(self._requests_queue,
                    self._results_queue, poll_timeout=poll_timeout))
    
        def dismissWorkers(self, num_workers, do_join=False):
            """Tell num_workers worker threads to quit after their current task."""
            dismiss_list = []
            for i in range(min(num_workers, len(self.workers))):
                worker = self.workers.pop()
                worker.dismiss()
                dismiss_list.append(worker)
    
            if do_join:
                for worker in dismiss_list:
                    worker.join()
            else:
                self.dismissedWorkers.extend(dismiss_list)
    
        def joinAllDismissedWorkers(self):
            """Perform Thread.join() on all worker threads that have been dismissed.
            """
            for worker in self.dismissedWorkers:
                worker.join()
            self.dismissedWorkers = []
    
        def putRequest(self, request, block=True, timeout=None):
            """Put work request into work queue and save its id for later."""
            assert isinstance(request, WorkRequest)
            # don't reuse old work requests
            assert not getattr(request, 'exception', None)
            self._requests_queue.put(request, block, timeout)
            self.workRequests[request.requestID] = request
    
        def poll(self, block=False):
            """Process any new results in the queue."""
            while True:
                # still results pending?
                if not self.workRequests:
                    raise NoResultsPending
                # are there still workers to process remaining requests?
                elif block and not self.workers:
                    raise NoWorkersAvailable
                try:
                    # get back next results
                    request, result = self._results_queue.get(block=block)
                    # has an exception occured?
                    if request.exception and request.exc_callback:
                        request.exc_callback(request, result)
                    # hand results to callback, if any
                    if request.callback and not \
                           (request.exception and request.exc_callback):
                        request.callback(request, result)
                    del self.workRequests[request.requestID]
                except Queue.Empty:
                    break
    
        def wait(self):
            """Wait for results, blocking until all have arrived."""
            while 1:
                try:
                    self.poll(True)
                except NoResultsPending:
                    break
    def func1(domain):
        cmd = cmd = 'tracert  %s ' % domain
        try:
            proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE)
            outroute1, err = proc.communicate()
        except Exception as e:
            pass
        return outroute1
    def do_something(data):
        time.sleep(data*5)
        result = round(random.random() * data, 5)
        return result
    global requests
    global getrnum
    def callfunction(mytaskpool,funname,data):
        global SimuRunCount
        requests=makeRequests(funname,data,print_result,handle_exception)
        for req in requests:
            global iIndex
            global iwhile
            iwhile=iwhile+1
            iIndex = iIndex + 1
            mytaskpool.putRequest(req)
            funargsstr = re.findall(".*args=(.*) kwargs=.*", str(req))
            funnamestr = re.findall(".*<function (.*) at .*", str(funname))
            args=funargsstr[0]
            funNamet=funnamestr[0]
            print("Work request #funName %s #params %s #id %s added." % (funNamet,args, req.requestID))
        if iIndex==SimuRunCount:
            while True:
                try:
                    time.sleep(0.5)
                    mytaskpool.poll()
                    if iwhile == SimuRunCount:
                        mytaskpool.createWorkers(SimuRunCount)
                    if iwhile== 20:
                        mytaskpool.dismissWorkers(2)
                    iwhile += 1
                except KeyboardInterrupt:
                    print("**** Interrupted!")
                    break
                except NoResultsPending:
                    break
            iIndex = 0
            iwhile = 0
    
            # mytaskpool.wait()
                #print finshedarrylist
    def getreuslt(getnum):
        getresult=[]
        if getnum>0:
            if getnum>len(finshedarrylist):
                getnum=len(finshedarrylist)
                #print getnum
            for c in range(0,getnum):
                getresult.append(finshedarrylist[c])
            for ritem in getresult:
                finshedarrylist.remove(ritem)
        return getresult
    
    
    def print_result(request, result):
        requestlist=str(request)
        funnamestr = re.findall(".*<WorkRequest funname=<function (.*)at.*", requestlist)
        funargstr = re.findall(".* args=(.*) kwargs=.*", requestlist)
        funName = str(funnamestr[0]).replace(' ', '')
        argslist=str(funargstr[0]).replace(',', '').replace('(', '').replace(')', '')
        finshExecArray = {}
        finshExecArray['request'] = request.requestID
        finshExecArray["params"] = argslist
        finshExecArray["funname"]=funName
        finshExecArray['result'] = result
        print("**** Result from request #%s:%s:%s:%s" % (request.requestID,funName,argslist, result))
        finshedarrylist.append(finshExecArray)
    # this will be called when an exception occurs within a thread
    # this example exception handler does little more than the default handler
    def handle_exception(request, exc_info):
        if not isinstance(exc_info, tuple):
            # Something is seriously wrong...
            print(request)
            print(exc_info)
            raise SystemExit
        print("**** Exception occured in request #%s: %s" % \
              (request.requestID, exc_info))
    SimuRunCount = 2
    finshedarrylist=[]
    if __name__ == '__main__':
        iIndex = 0
        iwhile = 0
        requests = None
        FinishArray=[]
        mytaskpool = ThreadPool(SimuRunCount)
        data1 = [((3,), {}), ((5,), {})]
        callfunction(mytaskpool, do_something, data1)
        dataurl = [(('www.lessnet.cn',), {})]
        callfunction(mytaskpool, func1,dataurl)
        data = [((6,), {})]
        callfunction(mytaskpool, do_something, data)
        data = [((7,), {})]
        callfunction(mytaskpool, do_something, data)
        print finshedarrylist
        getreusltlist = getreuslt(SimuRunCount)
        print getreusltlist
        print finshedarrylist
        if mytaskpool.dismissedWorkers:
            mytaskpool.joinAllDismissedWorkers()
        print("Joining all dismissed worker threads...")
    

    目前只能等待设定的线程数完成后才执行下一轮,如何更改表示我完成 5 个线程,继续添加等待的任务放入执行队列中。

    3 条回复    2017-03-04 16:01:55 +08:00
    wwqgtxx
        1
    wwqgtxx  
       2017-02-03 19:12:17 +08:00 via iPhone
    直接用 concurrent.futures 类库不就行了,还有你这个排版…
    simple221
        2
    simple221  
    OP
       2017-02-16 17:53:15 +08:00
    :-D 我直接复制这段段代码过来的,然后就成这样了。
    Livid
        3
    Livid  
    MOD
       2017-03-04 16:01:55 +08:00
    @simple221 帮你编辑了一下, V2EX 是支持 Markdown 代码高亮的。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3504 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 36ms · UTC 04:45 · PVG 12:45 · LAX 20:45 · JFK 23:45
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.