Py学习  »  Python

使用Python并发处理1000个链接。期货

Dawn.Sahil • 3 年前 • 1121 次点击  

我试图从大约1000个链接中提取数据,这些链接具有相同的内容和提取数据的相同过程。为了加快进程,我使用python的并发。我认为就速度而言,未来是最好的。当我试着从大约30-40个链接中获取数据时,它是有效的;但随着人数的增加,情况并非如此。这是我的代码:

import re
import json
import requests
import concurrent.futures
import time

links_json = ['https://webgate.ec.europa.eu/rasff-window/backend/public/notification/view/id/485387/',
'https://webgate.ec.europa.eu/rasff-window/backend/public/notification/view/id/485256/',
'https://webgate.ec.europa.eu/rasff-window/backend/public/notification/view/id/487113/',
'https://webgate.ec.europa.eu/rasff-window/backend/public/notification/view/id/486733/',
'https://webgate.ec.europa.eu/rasff-window/backend/public/notification/view/id/486937/',
'https://webgate.ec.europa.eu/rasff-window/backend/public/notification/view/id/486946/',
'https://webgate.ec.europa.eu/rasff-window/backend/public/notification/view/id/485444/',
'https://webgate.ec.europa.eu/rasff-window/backend/public/notification/view/id/487258/',
'https://webgate.ec.europa.eu/rasff-window/backend/public/notification/view/id/487011/',
'https://webgate.ec.europa.eu/rasff-window/backend/public/notification/view/id/487254/']

MAX_THREADS = 30

Data_Source = "RASFF"
Product_Category = []
Date = []
Product_name = []
Reference = []

def scrape(links):
    data = requests.get(links).json()
    Product_Category.append(data["product"]["productCategory"]["description"])
    Date.append(data["ecValidationDate"])
    Product_name.append(data["product"]["description"])
    Reference.append(data["reference"])

        
def download_data(links_json):
    threads = min(MAX_THREADS, len(links_json))

    with concurrent.futures.ThreadPoolExecutor(max_workers=threads) as executor:
       executor.map(scrape, links_json)

def main(new_links):
    t0 = time.time()
    download_data(new_links)
    t1 = time.time()
    print(f"{t1-t0} seconds to crawl {len(new_links)} in total.")

main(links_json)

当我试图运行main函数时,它非常不一致。同样,现在只有12个链接需要删除,但是随着链接的增加,列表中应该提取的数据也会减少。例如:如果有大约200个链接,那么在 Product_category 但有时会有100、67等,这意味着它非常不一致。我不确定我是否错过了什么。我甚至尝试过添加 time.sleep(0.25) 但它不起作用。我不知道如何在这里提供500-1000个链接的列表。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/130491
 
1121 次点击  
文章 [ 1 ]  |  最新文章 3 年前
user2668284 user2668284
Reply   •   1 楼
user2668284 user2668284    4 年前

下面是一个使用线程模块的示例:-

import requests
import threading

Product_Category = []
Date = []
Product_name = []
Reference = []
AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 11_5_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.2 Safari/605.1.15'
BASEURL = 'https://webgate.ec.europa.eu/rasff-window/backend/public/notification/view/id/'
LOCK = threading.Lock()

headers = {'User-Agent': AGENT}
links = ['485387',
         '485256',
         '487113',
         '486733',
         '486937',
         '486946',
         '485444',
         '487258',
         '487011',
         '487254']


def scrape(session, link):
    response = session.get(f'{BASEURL}{link}/', headers=headers)
    response.raise_for_status()
    json = response.json()
    try:
        LOCK.acquire()
        Product_Category.append(
            json["product"]["productCategory"]["description"])
        Date.append(json["ecValidationDate"])
        Product_name.append(json["product"]["description"])
        Reference.append(json["reference"])
    finally:
        LOCK.release()


def main():
    with requests.Session() as session:
        ta = []
        for link in links:
            t = threading.Thread(target=scrape, args=(session, link))
            ta.append(t)
            t.start()
        for t in ta:
            t.join()
        print(Product_Category)
        print(Date)
        print(Product_name)
        print(Reference)


if __name__ == '__main__':
    main()