我将如何在实时场景中使用并发。期货和队列?

【字号: 日期:2024-03-07浏览:24作者:雯心
如何解决我将如何在实时场景中使用并发。期货和队列??

Python文档中的示例已扩展为可以从队列中进行工作。需要注意的更改是,此代码使用concurrent.futures.wait而不是concurrent.futures.as_completed允许在等待其他工作完成时开始新工作。

import concurrent.futuresimport urllib.requestimport timeimport queueq = queue.Queue()URLS = [’http://www.foxnews.com/’,’http://www.cnn.com/’,’http://europe.wsj.com/’,’http://www.bbc.co.uk/’,’http://some-made-up-domain.com/’]def Feed_the_workers(spacing): ''' Simulate outside actors sending in work to do, request each url twice ''' for url in URLS + URLS:time.sleep(spacing)q.put(url) return 'DONE FeedING'def load_url(url, timeout): ''' Retrieve a single page and report the URL and contents ''' with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()# We can use a with statement to ensure threads are cleaned up promptlywith concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # start a future for a thread which sends work in through the queue future_to_url = {executor.submit(Feed_the_workers, 0.25): ’FeedER DONE’} while future_to_url:# check for status of the futures which are currently workingdone, not_done = concurrent.futures.wait( future_to_url, timeout=0.25, return_when=concurrent.futures.FirsT_COMPLETED)# if there is incoming work, start a new futurewhile not q.empty(): # fetch a url from the queue url = q.get() # Start the load operation and mark the future with its URL future_to_url[executor.submit(load_url, url, 60)] = url# process any completed futuresfor future in done: url = future_to_url[future] try:data = future.result() except Exception as exc:print(’%r generated an exception: %s’ % (url, exc)) else:if url == ’FeedER DONE’: print(data)else: print(’%r page is %d bytes’ % (url, len(data))) # remove the Now completed future del future_to_url[future]

url两次获取的输出:

’http://www.foxnews.com/’ page is 67574 bytes’http://www.cnn.com/’ page is 136975 bytes’http://www.bbc.co.uk/’ page is 193780 bytes’http://some-made-up-domain.com/’ page is 896 bytes’http://www.foxnews.com/’ page is 67574 bytes’http://www.cnn.com/’ page is 136975 bytesDONE FeedING’http://www.bbc.co.uk/’ page is 193605 bytes’http://some-made-up-domain.com/’ page is 896 bytes’http://europe.wsj.com/’ page is 874649 bytes’http://europe.wsj.com/’ page is 874649 bytes解决方法

concurrent.futures如下所示,使用Python 3的模块进行并行工作非常容易。

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: future_to = {executor.submit(do_work,input,60): input for input in dictionary} for future in concurrent.futures.as_completed(future_to):data = future.result()

将项目插入和检索到队列中也非常方便。

q = queue.Queue()for task in tasks:q.put(task)while not q.empty(): q.get()

我有一个脚本在后台运行,以监听更新。现在,理论上假设,随着这些更新的到来,我将对它们进行排队,并使用进行并发处理ThreadPoolExecutor。

现在,单独地,所有这些组件都是独立工作的,并且很有意义,但是我如何一起使用它们呢?我不知道是否有可能ThreadPoolExecutor实时从队列中馈送工作,除非预先确定要工作的数据?

简而言之,我要做的就是,每秒接收4条消息的更新,将它们推送到队列中,并让我的current.futures对其进行处理。如果我不这样做,那我就会陷入缓慢的顺序方法中。

让我们以下面的Python文档中的规范示例为例:

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(load_url,url,60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try: data = future.result()except Exception as exc: print(’%r generated an exception: %s’ % (url,exc))else: print(’%r page is %d bytes’ % (url,len(data)))

的列表URLS是固定的。是否可以实时提供此列表,并让工作人员从列表中进行处理,也许出于管理目的而从队列中进行处理?我有点困惑我的方法是否 真能 ?

相关文章: