-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathmultiThread_spider.py
More file actions
78 lines (76 loc) · 2.62 KB
/
multiThread_spider.py
File metadata and controls
78 lines (76 loc) · 2.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#coding:utf-8
import threading,Queue
import html_parser, html_outputer, html_downloader
import time
class Spider(object):
def __init__(self, tp, seed):
self.tp = tp
self.seed = seed
self.visited = set()
self.html_outputer = html_outputer.HtmlOutputer()
self.html_parser = html_parser.HtmlParser()
self.html_downloader = html_downloader.HtmlDownloader()
def parser(self, url):
if url in self.visited:
return
if len(self.visited) > 20:
return
print url, threading.current_thread()
try:
self.visited.add(url)
content = self.html_downloader.download(url)
urls, data = self.html_parser.parse(url, content)
self.html_outputer.collect_data(data)
for url in urls:
if url is None:
continue
self.tp.add_job(self.parser, url) #添加一个任务
except Exception, e:
print e
print 'download fail'
def work(self):
self.tp.add_job(self.parser, self.seed)
self.tp.wait_all_complete()
class ThreadingPool(object):
def __init__(self, num):
self.work_queue = Queue.Queue()
self.threads = []
self.thread_num = num
self._init_pool(num)
#初始化线程池
def _init_pool(self, num):
for i in xrange(num):
self.threads.append(Work(self.work_queue))
#添加一项任务入队
def add_job(self, func, *args):
self.work_queue.put((func, args))
#等待所有线程运行结束
def wait_all_complete(self):
for thread in self.threads:
if thread.is_alive():
thread.join()
class Work(threading.Thread):
def __init__(self, work_queue, timeout=2):
super(Work, self).__init__()
self.work_queue = work_queue
self.timeout = timeout #等待任务队列多长时间
self.start()
def run(self):
while True:
#死循环,直到没有任务
try:
func, args = self.work_queue.get(timeout=self.timeout) #任务出队列,Queue内部实现了同步机制
func(*args) #执行任务
self.work_queue.task_done() #任务结束
except Exception,e:
print e
break
if __name__ == "__main__":
start = time.time()
seed = 'http://baike.baidu.com/view/21087.htm'
tp = ThreadingPool(3)
spider = Spider(tp, seed)
spider.work()
spider.html_outputer.output_html()
end = time.time()
print "cost all time: %s" % (end-start)