Masterclass
While large, curated datasets like Common Crawl provide a significant starting point (as discussed in "Utilizing Common Crawl Data"), there are scenarios where you need more targeted or fresher data than readily available archives offer. Web scraping, the process of automatically extracting data from websites, becomes a necessary tool. However, scaling this process to collect terabytes of text suitable for LLM pre-training introduces substantial engineering challenges beyond simple script execution.
This section focuses on the techniques and considerations required for building and operating web crawlers capable of fetching vast amounts of data efficiently and responsibly.
Web scraping is fundamentally an I/O-bound task. Your crawler spends most of its time waiting for network responses. Using standard synchronous code (like the popular requests
library) means each request blocks execution until it completes. This severely limits throughput.
To overcome this, asynchronous programming is essential. Python's asyncio
library, combined with HTTP clients like aiohttp
, allows your crawler to initiate many network requests concurrently. While one request is waiting for a response, the program can work on initiating others or processing completed ones, drastically improving overall speed.
Here's an example using aiohttp
to fetch multiple URLs concurrently:
import asyncio
import aiohttp
import time
async def fetch(session, url):
"""Asynchronously fetches a single URL."""
print(f"Fetching: {url}")
try:
# Set a timeout to avoid hanging indefinitely
async with session.get(url, timeout=10) as response:
# Ensure we only process successful responses
if response.status == 200:
# In a real scraper, you'd process the content here
# content = await response.text()
# print(f"Got content from {url}, length: {len(content)}")
print(
f"Successfully fetched: {url} "
f"(Status: {response.status})"
)
return await response.text() # Return content or status
else:
print(
f"Failed: {url} (Status: {response.status})"
)
return None
except asyncio.TimeoutError:
print(f"Timeout fetching: {url}")
return None
except aiohttp.ClientError as e:
print(f"Client error fetching {url}: {e}")
return None
async def main(urls):
"""Manages the concurrent fetching of multiple URLs."""
# Create a single session for connection pooling
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
# Gather runs tasks concurrently and waits for them all to finish
results = await asyncio.gather(*tasks)
# Process results (optional, here just filtering out None)
successful_fetches = [r for r in results if r is not None]
print(
f"\nFinished fetching. "
f"Successful: {len(successful_fetches)}/{len(urls)}"
)
# Example Usage
target_urls = [
"https://example.com",
"https://httpbin.org/get",
"https://httpbin.org/delay/1", # Simulate a slow response
"https://nonexistent-domain-xyz123.org", # Simulate connection error
"https://httpbin.org/status/404" # Simulate HTTP error
]
start_time = time.time()
# In a Jupyter Notebook or script, run the asyncio event loop
# asyncio.run(main(target_urls)) # Use this in a standard Python script
# If running in an environment where an event loop is already running
# (like Jupyter):
# await main(target_urls) # Use this in Jupyter/IPython with await support
# For demonstration purposes, we'll run it here directly
if __name__ == "__main__":
asyncio.run(main(target_urls))
end_time = time.time()
print(f"Total time: {end_time - start_time:.2f} seconds")
Running this code typically takes slightly longer than the longest individual request (the 1-second delay in this example), rather than the sum of all request times, demonstrating the benefit of concurrency.
Even with asynchronous operations, a single machine has limits on bandwidth, CPU, and memory. To scrape at the scale needed for LLMs (potentially billions of pages), a distributed architecture is required. Common components include:
aiohttp
example, often enhanced using frameworks like Scrapy). They fetch URLs from the queue, download pages, potentially extract links for new URLs (adding them back to the queue), and process/store the content.robots.txt
rules, and potentially page checksums for deduplication. Databases (SQL or NoSQL) or specialized key-value stores can serve this purpose.A simplified view of a distributed crawling system. Workers fetch URLs from a central queue, process pages, store content, update metadata, and potentially discover new URLs to add back to the queue.
Aggressive scraping can overload websites, impacting their availability for legitimate users and potentially leading to your IP addresses being blocked. Large-scale crawling must be done responsibly.
Most websites provide a /robots.txt
file outlining rules for automated agents (bots/crawlers). These rules specify which parts of the site should not be accessed (Disallow
) and sometimes suggest a preferred crawl delay. Respecting robots.txt
is a fundamental aspect of ethical scraping.
Python's urllib.robotparser
can help:
from urllib.robotparser import RobotFileParser
from urllib.parse import urljoin
# Define your crawler's User-Agent string
# Be specific and provide contact info if possible
USER_AGENT = "MyLLMDataCrawler/1.0 (+http://mycrawlerinfo.example.com)"
def can_fetch_url(robot_parser, url):
"""Checks if the URL is allowed by robots.txt for our user agent."""
try:
return robot_parser.can_fetch(USER_AGENT, url)
except Exception as e:
# Handle potential parsing errors gracefully
print(f"Error checking robots.txt permission for {url}: {e}")
return False # Default to not fetching if unsure
# --- Usage within a crawler ---
robots_url_cache = {}
# Cache parser objects per domain to avoid re-fetching
async def check_and_fetch(session, url):
# Simplified: assumes URL is well-formed http/https
base_url = urljoin(url, '/')
robots_url = urljoin(base_url, 'robots.txt')
parser = robots_url_cache.get(base_url)
if not parser:
print(f"Fetching robots.txt for {base_url}")
parser = RobotFileParser()
parser.set_url(robots_url)
try:
# Use an async request here in a real async crawler
# For simplicity, using read() which might block in async code
# In aiohttp, you'd fetch robots_url and then call parser.parse()
# await parser.read()
# # Ideal async way (requires async adaptation of parser)
# --- Synchronous simulation for example clarity ---
import requests
response = requests.get(
robots_url,
timeout=5,
headers={'User-Agent': USER_AGENT}
)
if response.status_code == 200:
parser.parse(response.text.splitlines())
else:
print(
f"No valid robots.txt found for {base_url} "
f"(Status: {response.status_code})"
)
# --- End Synchronous simulation ---
robots_url_cache[base_url] = parser
# Cache even if fetch failed or no rules
except Exception as e:
print(f"Failed to fetch or parse robots.txt for {base_url}: {e}")
robots_url_cache[base_url] = None # Mark as failed
return None # Cannot proceed without checking robots.txt
if parser and can_fetch_url(parser, url):
print(f"Allowed to fetch: {url}")
# Proceed with actual fetching using aiohttp (fetch function from earlier)
return await fetch(session, url)
elif parser:
print(f"Disallowed by robots.txt: {url}")
return None
else:
# Handle case where robots.txt fetching failed earlier
print(f"Could not verify robots.txt permission for {url}, skipping.")
return None
# --- Example call within async context ---
# async with aiohttp.ClientSession(
# headers={'User-Agent': USER_AGENT}) as session:
# await check_and_fetch(
# session, "https://example.com/some/allowed/path")
# await check_and_fetch(
# session, "https://example.com/some/disallowed/path")
# # Assuming disallowed in robots.txt
Even if allowed by robots.txt
, hitting a single server with hundreds or thousands of requests per second can overwhelm it. Implement rate limiting per domain.
await asyncio.sleep(delay_seconds)
between requests to the same domain.Crawl-delay
Directive: Some robots.txt
files suggest a delay. Respect this if present.A simple per-domain delay mechanism:
import asyncio
import time
# Store the last access time for each domain
last_access_times = {}
# Minimum delay between requests to the same domain (in seconds)
MIN_DELAY_PER_DOMAIN = 1.0
async def rate_limited_fetch(session, url):
# Requires 'from urllib.parse import urlparse'
domain = urlparse(url).netloc
last_access = last_access_times.get(domain, 0)
now = time.monotonic()
elapsed = now - last_access
if elapsed < MIN_DELAY_PER_DOMAIN:
wait_time = MIN_DELAY_PER_DOMAIN - elapsed
print(f"Rate limiting {domain}: Waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
# Update last access time *before* making the request
last_access_times[domain] = time.monotonic()
# Now perform the actual fetch
# (using function like 'fetch' or 'check_and_fetch')
result = await fetch(session, url) # Assume 'fetch' handles exceptions
return result
Always set a clear User-Agent
string that identifies your crawler and ideally provides a way to contact you (e.g., a URL with information or an email address). This helps site administrators understand the traffic source and contact you if your crawler causes problems. Avoid using generic browser user agents.
Example: User-Agent: LLMBuilderBot/0.1 (+http://www.my-llm-project.org/crawler-info)
Beyond politeness, large-scale scraping presents other technical hurdles:
Playwright
or Selenium
, which control a real browser engine, are needed. However, they are significantly more resource-intensive (CPU, memory) than simple HTTP clients, slowing down crawling and increasing costs. Use them selectively only when necessary.Building a scalable and robust web scraper is a significant software engineering task. While libraries and frameworks provide building blocks, careful design considering concurrency, distribution, storage, politeness, and error handling is necessary to successfully gather the vast amounts of data needed for training large language models. Remember that data quality is also paramount; the raw output of scraping often requires extensive cleaning and filtering, as discussed in Chapter 7 ("Data Cleaning and Preprocessing Pipelines").
© 2025 ApX Machine Learning