-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess_crawler.py
More file actions
77 lines (67 loc) · 2.67 KB
/
process_crawler.py
File metadata and controls
77 lines (67 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import time
import urlparse
import threading
import multiprocessing
from mongo_cache import MongoCache
from mongo_queue import MongoQueue
from downloader import Downloader
SLEEP_TIME = 1
def threaded_crawler(seed_url, delay=5, cache=None, scrape_callback=None, user_agent='wswp', proxies=None, num_retries=1, max_threads=10, timeout=60):
"""Crawl using multiple threads
"""
# the queue of URL's that still need to be crawled
crawl_queue = MongoQueue()
crawl_queue.clear()
crawl_queue.push(seed_url)
D = Downloader(cache=cache, delay=delay, user_agent=user_agent, proxies=proxies, num_retries=num_retries, timeout=timeout)
def process_queue():
while True:
# keep track that are processing url
try:
url = crawl_queue.pop()
except KeyError:
# currently no urls to process
break
else:
html = D(url)
if scrape_callback:
try:
links = scrape_callback(url, html) or []
except Exception as e:
print 'Error in callback for: {}: {}'.format(url, e)
else:
for link in links:
# add this new link to queue
crawl_queue.push(normalize(seed_url, link))
crawl_queue.complete(url)
# wait for all download threads to finish
threads = []
while threads or crawl_queue:
for thread in threads:
if not thread.is_alive():
threads.remove(thread)
while len(threads) < max_threads and crawl_queue.peek():
# can start some more threads
thread = threading.Thread(target=process_queue)
thread.setDaemon(True) # set daemon so main thread can exit when receives ctrl-c
thread.start()
threads.append(thread)
time.sleep(SLEEP_TIME)
def process_crawler(args, **kwargs):
num_cpus = multiprocessing.cpu_count()
#pool = multiprocessing.Pool(processes=num_cpus)
print 'Starting {} processes'.format(num_cpus)
processes = []
for i in range(num_cpus):
p = multiprocessing.Process(target=threaded_crawler, args=[args], kwargs=kwargs)
#parsed = pool.apply_async(threaded_link_crawler, args, kwargs)
p.start()
processes.append(p)
# wait for processes to complete
for p in processes:
p.join()
def normalize(seed_url, link):
"""Normalize this URL by removing hash and adding domain
"""
link, _ = urlparse.urldefrag(link) # remove hash to avoid duplicates
return urlparse.urljoin(seed_url, link)