趋近智
像 Common Crawl 这样的大型精选数据集提供了很好的起点。然而,在某些情况下,您需要比现有归档数据更具针对性或更新的数据。网页抓取,即从网站自动提取数据的过程,成为一种必要手段。将此过程扩展到收集适用于大型语言模型预训练的数TB文本,会带来相当大的工程挑战。
本节侧重于构建和运行能够高效且负责任地获取海量数据的网页爬虫所需的技术和注意事项。
网页抓取本质上是一个I/O密集型任务。您的爬虫大部分时间都在等待网络响应。使用标准同步代码(如流行的 requests 库)意味着每个请求都会阻塞执行,直到它完成。这会严重限制吞吐量。
为解决此问题,异步编程必不可少。Python 的 asyncio 库,结合 aiohttp 等 HTTP 客户端,允许您的爬虫同时发起多个网络请求。当一个请求等待响应时,程序可以开始其他请求或处理已完成的请求,从而大幅提升整体速度。
这是一个使用 aiohttp 同时获取多个URL的例子:
import asyncio
import aiohttp
import time
async def fetch(session, url):
"""异步获取单个URL。"""
print(f"正在获取: {url}")
try:
# 设置超时以避免无限期等待
async with session.get(url, timeout=10) as response:
# 确保只处理成功的响应
if response.status == 200:
# 在真实的爬虫中,您会在这里处理内容
# content = await response.text()
# print(f"已从 {url} 获取内容,长度: {len(content)}")
print(
f"成功获取: {url} "
f"(状态: {response.status})"
)
return await response.text() # 返回内容或状态
else:
print(
f"失败: {url} (状态: {response.status})"
)
return None
except asyncio.TimeoutError:
print(f"获取 {url} 超时")
return None
except aiohttp.ClientError as e:
print(f"获取 {url} 时发生客户端错误: {e}")
return None
async def main(urls):
"""管理多个URL的并发获取。"""
# 创建一个会话用于连接池
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
# gather 并发运行任务并等待所有任务完成
results = await asyncio.gather(*tasks)
# 处理结果(可选,此处仅过滤掉None)
successful_fetches = [r for r in results if r is not None]
print(
f"\n获取完成。 "
f"成功: {len(successful_fetches)}/{len(urls)}"
)
# 示例用法
target_urls = [
"https://example.com",
"https://httpbin.org/get",
"https://httpbin.org/delay/1", # 模拟慢响应
"https://nonexistent-domain-xyz123.org", # 模拟连接错误
"https://httpbin.org/status/404" # 模拟HTTP错误
]
start_time = time.time()
# 在Jupyter Notebook或脚本中,运行asyncio事件循环
# asyncio.run(main(target_urls)) # 在标准Python脚本中使用此行
# 如果在已运行事件循环的环境中运行(例如Jupyter):
# await main(target_urls) # 在支持await的Jupyter/IPython中使用此行
# 为演示目的,我们在此直接运行
if __name__ == "__main__":
asyncio.run(main(target_urls))
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f} 秒")
运行此代码通常比最长的单个请求所需时间略长(此示例中的1秒延迟),而不是所有请求时间的总和,这体现了并发的好处。
即使有异步操作,单台机器在带宽、CPU和内存方面也有局限。要达到大型语言模型所需的抓取规模(可能数十亿页面),需要分布式架构。常见组件包括:
aiohttp 示例,通常使用 Scrapy 等框架进行增强)。它们从队列中获取URL,下载页面,可能提取新URL的链接(将其添加回队列),并处理/存储内容。robots.txt 规则,以及可能用于去重化处理的页面校验和。数据库(SQL或NoSQL)或专用键值存储可以实现此目的。一个分布式爬取系统的简化图。工作节点从中央队列获取URL,处理页面,存储内容,更新元数据,并可能发现新的URL并将其添加回队列。
激进的抓取可能导致网站过载,影响其对合法用户的可用性,并可能导致您的IP地址被封锁。大规模爬取必须负责任地进行。
大多数网站提供一个 /robots.txt 文件,概述了对自动化代理(机器人/爬虫)的规则。这些规则指定了网站的哪些部分不应被访问(Disallow),有时会建议一个推荐的爬取延迟。遵守 robots.txt 是道德抓取的一个基本方面。
Python 的 urllib.robotparser 可以提供帮助:
from urllib.robotparser import RobotFileParser
from urllib.parse import urljoin
# 定义爬虫的User-Agent字符串
# 尽可能具体并提供联系方式
USER_AGENT = "MyLLMDataCrawler/1.0 (+http://mycrawlerinfo.example.com)"
def can_fetch_url(robot_parser, url):
"""检查URL是否被robots.txt允许,供我们的用户代理使用。"""
try:
return robot_parser.can_fetch(USER_AGENT, url)
except Exception as e:
# 优雅地处理潜在的解析错误
print(f"检查 {url} 的robots.txt权限时出错: {e}")
return False # 不确定时默认为不抓取
# --- 爬虫内部使用示例 ---
robots_url_cache = {}
# 缓存每个域的解析器对象,避免重复获取
async def check_and_fetch(session, url):
# 简化:假设URL是格式正确的http/https
base_url = urljoin(url, '/')
robots_url = urljoin(base_url, 'robots.txt')
parser = robots_url_cache.get(base_url)
if not parser:
print(f"正在获取 {base_url} 的robots.txt")
parser = RobotFileParser()
parser.set_url(robots_url)
try:
# 在真实的异步爬虫中,这里应该使用异步请求
# 为简单起见,这里使用了read(),这在异步代码中可能会阻塞
# 在aiohttp中,您会获取robots_url,然后调用parser.parse()
# await parser.read()
# # 理想的异步方式(需要对解析器进行异步适配)
# --- 为示例清晰起见进行同步模拟 ---
import requests
response = requests.get(
robots_url,
timeout=5,
headers={'User-Agent': USER_AGENT}
)
if response.status_code == 200:
parser.parse(response.text.splitlines())
else:
print(
f"未找到 {base_url} 的有效robots.txt "
f"(状态: {response.status_code})"
)
# --- 同步模拟结束 ---
robots_url_cache[base_url] = parser
# 即使获取失败或没有规则也进行缓存
except Exception as e:
print(f"获取或解析 {base_url} 的robots.txt失败: {e}")
robots_url_cache[base_url] = None # 标记为失败
return None # 未能检查robots.txt,无法继续
if parser and can_fetch_url(parser, url):
print(f"允许获取: {url}")
# 使用aiohttp(前面定义的fetch函数)进行实际获取
return await fetch(session, url)
elif parser:
print(f"被robots.txt禁止: {url}")
return None
else:
# 处理robots.txt获取早前失败的情况
print(f"无法验证 {url} 的robots.txt权限,跳过。")
return None
# --- 异步环境中的示例调用 ---
# async with aiohttp.ClientSession(
# headers={'User-Agent': USER_AGENT}) as session:
# await check_and_fetch(
# session, "https://example.com/some/allowed/path")
# await check_and_fetch(
# session, "https://example.com/some/disallowed/path")
# # 假设在robots.txt中被禁止
即使 robots.txt 允许,每秒向单个服务器发送数百或数千个请求也可能使其不堪重负。对每个域名实施频率限制。
await asyncio.sleep(delay_seconds)。Crawl-delay 指令: 有些 robots.txt 文件会建议一个延迟。如果存在,请遵守。一个简单的按域名延迟机制:
import asyncio
import time
# 存储每个域名的最后访问时间
last_access_times = {}
# 对同一域名请求之间的最小延迟(秒)
MIN_DELAY_PER_DOMAIN = 1.0
async def rate_limited_fetch(session, url):
# 需要 'from urllib.parse import urlparse'
domain = urlparse(url).netloc
last_access = last_access_times.get(domain, 0)
now = time.monotonic()
elapsed = now - last_access
if elapsed < MIN_DELAY_PER_DOMAIN:
wait_time = MIN_DELAY_PER_DOMAIN - elapsed
print(f"对 {domain} 进行频率限制: 等待 {wait_time:.2f} 秒")
await asyncio.sleep(wait_time)
# 在发起请求*之前*更新最后访问时间
last_access_times[domain] = time.monotonic()
# 现在执行实际的获取操作
# (使用 'fetch' 或 'check_and_fetch' 等函数)
result = await fetch(session, url) # 假设 'fetch' 处理异常
return result
始终设置清晰的 User-Agent 字符串,以标识您的爬虫,理想情况下提供联系方式(例如,带有信息或电子邮件地址的URL)。这有助于网站管理员了解流量来源,并在您的爬虫导致问题时与您联系。避免使用通用浏览器用户代理。
示例:User-Agent: LLMBuilderBot/0.1 (+http://www.my-llm-project.org/crawler-info)
除了礼貌爬取之外,大规模抓取还会带来其他技术障碍:
Playwright 或 Selenium 这样控制真实浏览器引擎的工具。然而,它们比简单的HTTP客户端资源消耗明显更高(CPU、内存),会减缓爬取速度并增加成本。仅在必要时选择性使用它们。构建一个可伸缩的网页抓取器是一项重要的软件工程任务。虽然库和框架提供了构建模块,但仔细的设计,考虑并发、分布式、存储、礼貌性和错误处理,对于成功收集训练大型语言模型所需的庞大数据是必要的。请记住,数据质量也十分重要;抓取的原始输出通常需要大量清洗和过滤,如第7章(“数据清洗和预处理流程”)所述。
这部分内容有帮助吗?
© 2026 ApX Machine Learning用心打造