python3 提高爬虫采集速度 方案三:多进程 + 队列

多进程使用注意点

1.多进程说明 进程:一个程序运行起来后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元。

2.使用多进程后的通信 多进程中使用普通的队列模块无法实现进程间的通讯,因为进程是系统分配资源的基本单元. 对应的需要使用multiprocessing提供的JoinableQueue模块,其使用过程和在线程中使用的queue方法相同

案列使用说明

queue = Queue() # 普通队列无法实现多进程间通信

多进程中要使用multiprocessing.JoinableQueue

from multiprocessing import Process import multiprocessing queue = multiprocessing.JoinableQueue()

def add_to_queue(): for i in range(0, 10): print(“向队列中添加: {}”.format(i)) queue.put(i)

def get_queue(): for i in range(0, 10): print(‘从队列中获取: {}’.format(queue.get())) queue.task_done()

p = Process(target=add_to_queue) p.daemon = True p.start()

p = Process(target=get_queue) p.daemon = True p.start()

让任务能够开始执行

time.sleep(0.01) queue.join() 3. 实现多进程的糗事百科爬虫 把多线程版修改多进程版:

把原来的threading.Thread修改为multiprocessing.Process 把queue.Queue修改为multiprocessing.JoinableQueue

代码

import requests
from lxml import etree
import json
from multiprocessing import JoinableQueue
from multiprocessing import Process
import time

def run_forever(func):
    def wrapper(obj):
        while True:
            func(obj)
    return wrapper


class JiubaiSpider(object):

    def __init__(self):
        self.headers = {
            User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36
        }
        self.url_pattern = https://www.Jiushaike.com/8hr/page/{}/
        # url 队列
        self.url_queue = JoinableQueue()
        # 响应队列
        self.page_queue = JoinableQueue()
        # 数据队列
        self.data_queue = JoinableQueue()


    def add_url_to_queue(self):
        # 把URL添加url队列中
        for i in range(1, 14):
            self.url_queue.put(self.url_pattern.format(i))

    @run_forever
    def add_page_to_queue(self):
         发送请求获取数据 
        url = self.url_queue.get()
        pass
        self.url_queue.task_done()

    @run_forever
    def add_dz_to_queue(self):
        根据页面内容使用lxml解析数据, 获取段子列表
        page = self.page_queue.get()
        pass
        self.page_queue.task_done()

    def get_first_element(self, list):
        获取列表中第一个元素,如果是空列表就返回None
        return list[0] if len(list) != 0 else None

    @run_forever
    def save_dz_list(self):
        把段子信息保存到文件中
        dz_list = self.data_queue.get()
        pass
        self.data_queue.task_done()


    def run_use_more_task(self, func, count=1):
        把func放到进程中执行, count:开启多少进程执行
        for i in range(0, count):
            t = Process(target=func)
            t.daemon = True
            t.start()

    def run(self):
        # 开启线程执行上面的几个方法
        self.run_use_more_task(self.add_url_to_queue)
        self.run_use_more_task(self.add_page_to_queue, 3)
        self.run_use_more_task(self.add_dz_to_queue, 2)
        self.run_use_more_task(self.save_dz_list, 2)

        # 让主线线程等待0.001s让有时间想队列中添加数据
        time.sleep(0.001)
        # 使用队列join方法,等待队列任务都完成了才结束
        self.url_queue.join()
        self.page_queue.join()
        self.data_queue.join()

if __name__ == __main__:
    qbs = JiubaiSpider()
    qbs.run()
经验分享 程序员 微信小程序 职场和发展