Introduction
Design and implement a multithreaded or asynchronous web crawler program that can crawl all web links under the same host starting from a specified URL.
A Blocking Solution
Let’s put aside the topics of efficiency and scalability, focus on the MVP that works. Given a starter url, we will use request and BeatifulSoup to make a http request and parse the page for additional link from the page. We use a queue to keep the newly discovered link. The program run until there is no more links to traverse.
Limitations of Blocking I/O in Web Crawling
- Inefficiency and Wasted Resources: In a blocking I/O model, when a request is made (e.g., fetching a webpage), the program pauses and waits for the response before proceeding to the next task. This means the CPU remains idle during the entire I/O operation (like network latency), leading to significant wasted computing resources, especially when dealing with many requests.
- Impact of I/O Wait Times: Web crawling is inherently I/O-bound. Operations like making HTTP requests, waiting for server responses, and writing to disk involve considerable delays. In a blocking setup, these wait times add up sequentially, causing the overall crawling process to be extremely slow. For instance, if one request takes 5 seconds, the next request cannot even start until those 5 seconds have passed.
- Poor Scalability: Due to the sequential nature of blocking I/O, scaling up to crawl a large number of URLs becomes impractical. Each request must complete before the next can begin, making it impossible to utilize available network bandwidth and CPU efficiently for concurrent operations. This drastically limits the rate at which URLs can be processed, making it unsuitable for large-scale crawling tasks that involve millions or billions of pages.
- Difficulty in Concurrency: Achieving true concurrency with blocking I/O typically requires multiple threads or processes, each managing its own blocking operation. While this can offer some parallelism, it introduces overheads related to thread/process management, context switching, and inter-process communication, which can be complex to manage and less efficient than asynchronous approaches for I/O-bound tasks.
1import requests
2import urllib.parse
3from bs4 import BeautifulSoup
4
5class WebCrawler:
6 def __init__(self, start_url: str):
7 self.start_url = start_url
8 self.visited_urls = set()
9 self.queue = [start_url] # Use a list as a basic queue for blocking crawler
10
11 parsed_url = urllib.parse.urlparse(start_url)
12 self.allowed_domains = {parsed_url.netloc}
13 print(f"WebCrawler (Blocking) initialized with start_url: {self.start_url}, allowed_domains: {self.allowed_domains}")
14
15 def _fetch_url(self, url: str) -> str | None:
16 try:
17 print(f"Fetching URL (blocking): {url}")
18 response = requests.get(url, timeout=10)
19 if response.status_code == 200:
20 return response.text
21 else:
22 print(f"Error fetching {url}: Status code {response.status_code}")
23 return None
24 except requests.exceptions.RequestException as e:
25 print(f"Error fetching {url}: {e}")
26 return None
27
28 def _get_links(self, html_content: str, base_url: str) -> list[str]:
29 if html_content is None:
30 return []
31
32 links = []
33 soup = BeautifulSoup(html_content, 'html.parser')
34 for link_tag in soup.find_all('a'):
35 href = link_tag.get('href')
36 if href:
37 normalized_url = urllib.parse.urljoin(base_url, href)
38 parsed_normalized_url = urllib.parse.urlparse(normalized_url)
39
40 is_http_https = parsed_normalized_url.scheme in ('http', 'https')
41 is_allowed_domain = parsed_normalized_url.netloc in self.allowed_domains
42
43 if is_http_https and is_allowed_domain:
44 links.append(normalized_url)
45 return links
46
47 def crawl(self) -> set[str]:
48 head = 0
49 while head < len(self.queue):
50 url = self.queue[head]
51 head += 1
52
53 if url in self.visited_urls:
54 continue
55
56 print(f"Processing URL (blocking): {url}")
57 self.visited_urls.add(url)
58
59 html_content = self._fetch_url(url)
60 if html_content:
61 new_links = self._get_links(html_content, url)
62 for link in new_links:
63 if link not in self.visited_urls and link not in self.queue:
64 print(f"Discovered new link (blocking): {link}")
65 self.queue.append(link)
66
67 print("Crawling (blocking) finished.")
68 return self.visited_urls
69
70# Instantiate and run the blocking crawler
71blocking_start_url = "https://example.com"
72blocking_crawler = WebCrawler(blocking_start_url)
73blocking_visited_urls = blocking_crawler.crawl()
74
75print("\n--- Visited URLs (Blocking Crawler) ---")
76for url in sorted(list(blocking_visited_urls)):
77 print(url)
1WebCrawler (Blocking) initialized with start_url: https://example.com, allowed_domains: {'example.com'}
2Processing URL (blocking): https://example.com
3Fetching URL (blocking): https://example.com
4Crawling (blocking) finished.
5
6--- Visited URLs (Blocking Crawler) ---
7https://example.com
A Efficient Solution Using Coroutines
The above single threaded implementation have many limitations due to its blocking execution. Only one thread is created, every request to the server will block the code, and CPU will be idle until the network call is finished and data received from the server. The code is slow and impossible to be used in practice for millions of urls.
Using coroutine can improve the performance by allowing different tasks (fetching a page, parsing links from the page) to run concurrently. Notice it’s not parallelisim, there is still one thread, but multiple coroutines that asyncio can schedule using the event loop to efficient schedule CPU tasks so that CPU will not idle.
1import asyncio
2import aiohttp
3import requests
4import urllib.parse
5from bs4 import BeautifulSoup
6
7import nest_asyncio
8nest_asyncio.apply()
9
10class WebCrawler:
11 def __init__(self, start_url, concurrency_limit):
12 self.start_url = start_url
13 self.visited_urls = set()
14 self.concurrency_limit = concurrency_limit
15
16 self.queue = asyncio.Queue()
17 self.queue.put_nowait(start_url)
18
19 self.session = aiohttp.ClientSession()
20
21 self.semaphore = asyncio.Semaphore(concurrency_limit)
22
23 parsed_url = urllib.parse.urlparse(start_url)
24 self.allowed_domains = {parsed_url.netloc}
25 print(f"WebCrawler initialized with start_url: {self.start_url}, concurrency_limit: {self.concurrency_limit}, allowed_domains: {self.allowed_domains}")
26
27 async def fetch_url(self, url: str):
28 async with self.semaphore:
29
30 try:
31 print(f"Fetching URL: {url}")
32 async with self.session.get(url, timeout=10) as response:
33 if response.status == 200:
34 # aiohttp returns a coroutine for response.text; we must await it
35 return await response.text()
36 else:
37 print(f"Error fetching {url}: Status code {response.status}")
38 return None
39 except (aiohttp.ClientError, asyncio.TimeoutError) as e:
40 print(f"Error Fetching {url}: {e}")
41 return None
42
43 async def get_links(self, html_content, base_url):
44 if html_content is None:
45 return
46
47 soup = BeautifulSoup(html_content, "html.parser")
48 for link_tag in soup.find_all('a'):
49 href = link_tag.get("href")
50 if href:
51 normalized_url = urllib.parse.urljoin(base_url, href)
52 parsed_norm_url = urllib.parse.urlparse(normalized_url)
53
54 is_http_https = parsed_norm_url.scheme in ["http", "https"]
55 is_allowed_domains = parsed_norm_url.netloc in self.allowed_domains
56
57 if is_http_https and is_allowed_domains:
58 yield normalized_url
59
60
61 async def process_url(self, url):
62 print(f"Processing URL: {url}")
63
64 self.visited_urls.add(url)
65
66 html_content = await self.fetch_url(url)
67 if html_content:
68 async for link in self.get_links(html_content, url):
69 if link not in self.visited_urls:
70 print(f"Discovered a new link: {link}")
71 self.visited_urls.add(url)
72 await self.queue.put(link)
73
74
75 async def crawl(self):
76 workers = []
77 for _ in range(self.concurrency_limit):
78 task = asyncio.create_task(self.worker())
79 workers.append(task)
80
81 await self.queue.join()
82
83 for work in workers:
84 work.cancel()
85
86 await asyncio.gather(*workers, return_exceptions=True)
87
88 await self.session.close() # Close the aiohttp client session
89 print("Crawling finished.")
90
91
92 async def worker(self):
93 while True:
94 try:
95 url = await self.queue.get()
96 await self.process_url(url)
97 self.queue.task_done()
98
99 except asyncio.CancelledError:
100 break
101 except Exception as e:
102 print(f"Worker encounterd a error: {e}")
103 self.queue.task_done()
104
105async def main():
106 start_url = "https://example.com"
107 crawler = WebCrawler(start_url, concurrency_limit=5)
108
109 await crawler.crawl()
110
111 print("\n--- Visited URLs ---")
112 for url in sorted(list(crawler.visited_urls)):
113 print(url)
114 return sorted(list(crawler.visited_urls))
115
116visited_urls_list = asyncio.run(main())
1WebCrawler initialized with start_url: https://example.com, concurrency_limit: 5, allowed_domains: {'example.com'}
2Processing URL: https://example.com
3Fetching URL: https://example.com
4Crawling finished.
5
6--- Visited URLs ---
7https://example.com
Follow up 1: how to support multi-threading for CPU bound task?
1def parse_links_sync_bs4(self, html_content, base_url):
2 if html_content is None:
3 return []
4 links = []
5 soup = BeautifulSoup(html_content, "html.parser")
6 for tag in soup.find_all("a"):
7 href = tag.get("href")
8 if not href:
9 continue
10 normalized = urllib.parse.urljoin(base_url, href)
11 p = urllib.parse.urlparse(normalized)
12 if p.scheme in ("http", "https") and p.netloc in self.allowed_domains:
13 links.append(normalized)
14 return links
use it in process_url()
1html = await self.fetch_url(url)
2if html:
3 links = await asyncio.to_thread(self.parse_links_sync_bs4, html, url)
4 for link in links:
5 if link not in self.visited_urls:
6 await self.queue.put(link)
Thoughts
At first, the coroutine usage is quite counter intuitive. Since my brain was mostly read the imperative code as it’s execute in order, it’s impossible to believe that a program can be paused in the middle and resume execution automatically. As result, you got a faster program! The only thing you need is “syntatic sugar” async and await.
From now on, you should internalize this model and when you see await func() in a program, it can be interpreted as “pasue the current execution here, let func() run, once there is a chance to resume, the paused program will resume automatically.”
There are still lots of questions here?
- When exactly the paused program got resumed?
- What if
func()also has a await statement? - Where the state of the execution is stored when the program is paused?
- what are the premitives that used to build coroutine (awaitables).
- How python implemented all these as part of a programming language, shouldn’t it be part of a system level feature?
Well, it is a rabit whole if you keep asking these questions. The behind sence story is very interesting, actually, one of the most interesting (at least to me) tech topic out there. If you are interested please read the follwing materials
- Python official quick start guide: A Conceptual Overview of asyncio
- Indepth version of above guide: A conceptual overview of the ideas and objects which power asyncio
- Asyncio tutorial by Łukasz Langa from EdgeDB YouTube tutorial series
- Corotuine and memory model overview: 500 Lines or Less: A Web Crawler With asyncio Coroutines
- Extended discussion about async context manager and iterator: Python Asyncio Series
Further improvement
Efficiency
- Add support for multi-threading in addition to asyncio for CPU-bound tasks.
- Implement caching to avoid re-fetching unchanged content.
- Implement depth limiting to control how deep the crawler goes.
- Add more sophisticated link filtering (e.g., exclude certain paths or query parameters).
- Add support for crawling sitemaps to discover URLs more efficiently.
- Implement a mechanism to detect and avoid crawling duplicate content.
- Implement a mechanism to filter out unwanted content (e.g., ads, pop-ups) during crawling.
Ensure politeness/unpoliteness
- Implement rate limiting to avoid overwhelming target servers.
- Implement a mechanism to handle and respect rate limits set by target servers.
- Add support for robots.txt to respect crawling rules.
- Implement a user-agent header to identify the crawler.
- Add support for proxy servers to route requests through different IPs.
- Add support for crawling password-protected pages (e.g., using HTTP authentication).
- Implement a mechanism to handle and respect cookies during crawling.
- Implement a mechanism to detect and handle CAPTCHAs encountered during crawling.
Improve usability
- Add command-line arguments for configuration (e.g., start URL, concurrency limit).
- Add logging instead of print statements for better monitoring and debugging.
- Implement retries for transient network errors.
- Implement a mechanism to back off and retry when encountering server errors (e.g., 5xx status codes)
- Implement a mechanism to pause and resume crawling.
- Implement a GUI or web interface to monitor the crawling process in real-time.
- Implement detailed error handling and reporting for different types of failures.
- Implement a mechanism to track and limit the total number of pages crawled.
- Implement a mechanism to visualize the crawl graph (e.g., using network graphs).
- Add support for scheduling crawls at specific intervals or times.
- Implement a mechanism to monitor and log performance metrics (e.g., crawl speed, success rate).
- Add support for crawling mobile versions of websites.
Enhanced features
- Handle different content types (e.g., images, PDFs) appropriately.
- Store results in a database or file for later analysis.
- Add support for crawling dynamic content loaded via JavaScript (e.g., using headless browsers).
- Add support for different output formats (e.g., JSON, CSV) for storing crawled data.
- Implement a mechanism to handle redirects appropriately.
- Add support for crawling APIs and handling JSON responses.
Testability
- Implement unit tests to ensure the crawler behaves as expected.