Python 多线程和多进程爬虫

前面的 《Python 的网络请求 Requests 模块使用》 我们已经对爬虫的基本过程有了了解,而且整个过程比较顺畅,容易理解,但是很多情况下我们的爬取工作并不是如此轻松的,我们可能会面临一些特殊情况,例如今天要说的速度问题,如果有大量的页面需要爬取,我们还使用单一线程就很慢了。

多线程爬虫

我们先不考虑特别大型的爬取,本文所有的思路都是基于满足个人需求的中型爬取需要。我们可以使用多线程来并行爬取,这样就可以成倍的提高爬取速度。

我们先来感受一下如何使用多线程:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
#!/usr/bin/python
# -*- coding: UTF-8 -*-

import _thread
import time

def print_time(threadName, delay):
    count = 0
    while count < 3:
        time.sleep(delay)
        count += 1
        print(threadName, time.ctime())

_thread.start_new_thread(print_time, ("Thread-1", 1))
_thread.start_new_thread(print_time, ("Thread-2", 2))

_thread.start_new_thread(function, args[, kwargs])

开启一个新线程并返回其标识。 线程执行函数 function 并附带参数列表 args (必须是元组)。 可选的 kwargs 参数指定一个关键字参数字典。当函数返回时,线程会静默地退出。当函数因某个未处理异常而终结时,sys.unraisablehook() 会被调用以处理异常。 钩子参数的 object 属性为 function。 在默认情况下,会打印堆栈回溯然后该线程将退出(但其他线程会继续运行)。当函数引发 SystemExit 异常时,它会被静默地忽略。

_thread 类是线程的基础类,事实上我们大可不必使用它,我们完全可以使用 threading 模块提供的 Thread 类来实现多线程操作。

threading 模块提供的常用方法如下:

  • threading.currentThread():返回当前的线程变量。
  • threading.enumerate():返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  • threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

除了使用方法外,线程模块同样提供了 Thread 类来处理线程,Thread 类提供了以下方法:

  • run():用以表示线程活动的方法。
  • start():启动线程活动。
  • join([time]):等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
  • isAlive():返回线程是否活动的。
  • getName():返回线程名。
  • setName():设置线程名。

使用 threading 启动两个线程来实现上面的逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/usr/bin/python
# -*- coding: UTF-8 -*-

import threading
import time

class myThread(threading.Thread):
    
    def __init__(self, name, delay):
        threading.Thread.__init__(self)
        self.name = name
        self.delay = delay

    def run(self):
        print("Starting " + self.name)
        print_time(self.name, self.delay)
        print("Exiting " + self.name)

def print_time(threadName, delay):
    count = 0
    while count < 3:
        time.sleep(delay)
        count += 1
        print(threadName, time.ctime())               


threads = []

threads.append(myThread("Thread-1", 1))
threads.append(myThread("Thread-2", 2))

for t in threads:
    t.start()

for t in threads:
    t.join()

print("Exiting Main Thread")

假设现在有 1000 个页面需要去爬取,我们可以将这个任务分配给多个线程同时执行。但是,这样显然是有个问题的,如果我们平均分配可能某个线程执行速度快,执行完后就会闲置,此时我们去动态分配可能要好一点。

你可以试想,如果这里有一个队列来寄存我们的任务,每一个线程执行完成后都去这个队列领取任务,这样的话就会形成动态分配任务。

Queue 是 python 标准库中的线程安全的队列 FIFO (First in First Out) 实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递。

Queue 提供了一个基本的 FIFO 容器,使用方法很简单,maxsize 是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果 maxsize 小于或者等于 0,队列大小没有限制。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/usr/bin/python
# -*- coding: UTF-8 -*-

import threading
import queue
import time
import random

class myThread(threading.Thread):
    
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name

    def doTask(self, threadName, taskName):
        time.sleep(random.randint(1, 3))
        print(threadName + "----do" + taskName + "\n", end='')

    def run(self):
        print("Starting " + self.name + "\n")
        while True:
            self.doTask(self.name, taskQueue.get(timeout=1))
        print("Exiting " + self.name + "\n")


threads = []
taskQueue = queue.Queue(1000)

for i in range(1, 1001):
    taskQueue.put("Task-" + str(i))

for i in range(1, 6):
    threads.append(myThread("Thread" + str(i)))
    
for t in threads:
    t.start()

for t in threads:
    t.join()

多线程看似能解决大部分速度问题,实质上这里并没有利用到 CPU 的多核特性,如果要利用 CPU 多核特性就需要使用多进程来实现了。

多进程爬虫

multiprocessing 是一个用于产生进程的包,具有与 threading 模块相似 API。 multiprocessing 包同时提供本地和远程并发,使用子进程代替线程,有效避免 Global Interpreter Lock 带来的影响。因此, multiprocessing 模块允许程序员充分利用机器上的多核。可运行于 Unix 和 Windows 。

1
2
3
4
5
6
#!/usr/bin/python
# -*- coding: UTF-8 -*-

import multiprocessing

print("CPU 数量: " + str(multiprocessing.cpu_count()))

接下来我们使用多进程来实现上面的例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#!/usr/bin/python
# -*- coding: UTF-8 -*-

from multiprocessing import Process, Queue
import queue
import time
import random

startTime = time.time()
print("Start All Process")

class myProcess(Process):
    
    def __init__(self, name, q):
        Process.__init__(self)
        self.name = name
        self.q = q

    def run(self):
        print("Starting " + self.name + "\n")
        while not self.q.empty():
            doTask(self.name, self.q.get(timeout=2))
        print("Exiting " + self.name + "\n")

def doTask(processName, taskName):
    time.sleep(random.randint(1, 3))
    print(processName + "----do" + taskName + "\n", end='')

if __name__ == '__main__':
    
    taskQueue = Queue(20)

    for i in range(1, 21):
        taskQueue.put("Task-" + str(i))
        
    for i in range(1, 7):
        p = myProcess("Process" + str(i), taskQueue)
        p.daemon = True
        p.start()
        
        p.join()
        
endTime = time.time()
print("总共用时(ms):" + str(endTime - startTime))
print("End All Process")