像 Common Crawl 这样的大型精选数据集提供了很好的起点。然而,在某些情况下,您需要比现有归档数据更具针对性或更新的数据。网页抓取,即从网站自动提取数据的过程,成为一种必要手段。将此过程扩展到收集适用于大型语言模型预训练的数TB文本,会带来相当大的工程挑战。本节侧重于构建和运行能够高效且负责任地获取海量数据的网页爬虫所需的技术和注意事项。异步操作以提高I/O效率网页抓取本质上是一个I/O密集型任务。您的爬虫大部分时间都在等待网络响应。使用标准同步代码(如流行的 requests 库)意味着每个请求都会阻塞执行,直到它完成。这会严重限制吞吐量。为解决此问题,异步编程必不可少。Python 的 asyncio 库,结合 aiohttp 等 HTTP 客户端,允许您的爬虫同时发起多个网络请求。当一个请求等待响应时,程序可以开始其他请求或处理已完成的请求,从而大幅提升整体速度。这是一个使用 aiohttp 同时获取多个URL的例子:import asyncio import aiohttp import time async def fetch(session, url): """异步获取单个URL。""" print(f"正在获取: {url}") try: # 设置超时以避免无限期等待 async with session.get(url, timeout=10) as response: # 确保只处理成功的响应 if response.status == 200: # 在真实的爬虫中,您会在这里处理内容 # content = await response.text() # print(f"已从 {url} 获取内容,长度: {len(content)}") print( f"成功获取: {url} " f"(状态: {response.status})" ) return await response.text() # 返回内容或状态 else: print( f"失败: {url} (状态: {response.status})" ) return None except asyncio.TimeoutError: print(f"获取 {url} 超时") return None except aiohttp.ClientError as e: print(f"获取 {url} 时发生客户端错误: {e}") return None async def main(urls): """管理多个URL的并发获取。""" # 创建一个会话用于连接池 async with aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in urls] # gather 并发运行任务并等待所有任务完成 results = await asyncio.gather(*tasks) # 处理结果(可选,此处仅过滤掉None) successful_fetches = [r for r in results if r is not None] print( f"\n获取完成。 " f"成功: {len(successful_fetches)}/{len(urls)}" ) # 示例用法 target_urls = [ "https://example.com", "https://httpbin.org/get", "https://httpbin.org/delay/1", # 模拟慢响应 "https://nonexistent-domain-xyz123.org", # 模拟连接错误 "https://httpbin.org/status/404" # 模拟HTTP错误 ] start_time = time.time() # 在Jupyter Notebook或脚本中,运行asyncio事件循环 # asyncio.run(main(target_urls)) # 在标准Python脚本中使用此行 # 如果在已运行事件循环的环境中运行(例如Jupyter): # await main(target_urls) # 在支持await的Jupyter/IPython中使用此行 # 为演示目的,我们在此直接运行 if __name__ == "__main__": asyncio.run(main(target_urls)) end_time = time.time() print(f"总耗时: {end_time - start_time:.2f} 秒")运行此代码通常比最长的单个请求所需时间略长(此示例中的1秒延迟),而不是所有请求时间的总和,这体现了并发的好处。分布式爬虫架构即使有异步操作,单台机器在带宽、CPU和内存方面也有局限。要达到大型语言模型所需的抓取规模(可能数十亿页面),需要分布式架构。常见组件包括:URL 前沿/队列: 一个管理待爬取URL列表的系统。这需要处理可能数十亿条目,优先处理URL,并防止重复爬取最近访问过的页面。常用消息队列如 RabbitMQ、Kafka,甚至是 Redis 列表。工作节点从队列中获取URL批次。爬虫工作节点: 多个进程或机器,每个都运行一个异步爬虫实例(如 aiohttp 示例,通常使用 Scrapy 等框架进行增强)。它们从队列中获取URL,下载页面,可能提取新URL的链接(将其添加回队列),并处理/存储内容。内容存储: 用于原始HTML或提取文本的可伸缩存储系统。云对象存储(如 AWS S3、Google Cloud Storage)或分布式文件系统(HDFS)是合适的选择(第8章会更详细讨论)。元数据存储: 通常需要用于追踪已访问URL、爬取时间戳、robots.txt 规则,以及可能用于去重化处理的页面校验和。数据库(SQL或NoSQL)或专用键值存储可以实现此目的。digraph G { rankdir=LR; node [shape=box, style=rounded, fontname="Arial", fontsize=10, color="#495057", fillcolor="#e9ecef", style=filled]; edge [fontname="Arial", fontsize=9, color="#868e96"]; subgraph cluster_queue { label = "URL 管理"; bgcolor="#e9ecef"; URL_Queue [label="URL 队列\n(例如, Kafka, Redis)"]; Metadata_DB [label="元数据数据库\n(已访问URL, 时间戳)"]; } subgraph cluster_workers { label = "爬取工作节点"; bgcolor="#dee2e6"; Worker1 [label="爬虫工作节点 1\n(异步获取器)", shape= Mrecord, fillcolor="#a5d8ff"]; Worker2 [label="爬虫工作节点 2\n(异步获取器)", shape= Mrecord, fillcolor="#a5d8ff"]; WorkerN [label="爬虫工作节点 N\n(异步获取器)", shape= Mrecord, fillcolor="#a5d8ff"]; } subgraph cluster_storage { label = "数据存储"; bgcolor="#ced4da"; Data_Store [label="原始/处理过的数据\n(例如, S3, HDFS)"]; } URL_Queue -> Worker1 [label=" URL 批次"]; URL_Queue -> Worker2 [label=" URL 批次"]; URL_Queue -> WorkerN [label=" URL 批次"]; Worker1 -> URL_Queue [label=" 发现新URL", style=dashed]; Worker2 -> URL_Queue [label=" 发现新URL", style=dashed]; WorkerN -> URL_Queue [label=" 发现新URL", style=dashed]; Worker1 -> Metadata_DB [label=" 更新状态", style=dashed]; Worker2 -> Metadata_DB [label=" 更新状态", style=dashed]; WorkerN -> Metadata_DB [label=" 更新状态", style=dashed]; Worker1 -> Data_Store [label=" 存储内容"]; Worker2 -> Data_Store [label=" 存储内容"]; WorkerN -> Data_Store [label=" 存储内容"]; Worker1 -> Metadata_DB [label=" 检查是否已访问", style=dashed, constraint=false]; }一个分布式爬取系统的简化图。工作节点从中央队列获取URL,处理页面,存储内容,更新元数据,并可能发现新的URL并将其添加回队列。实现负责任的爬取激进的抓取可能导致网站过载,影响其对合法用户的可用性,并可能导致您的IP地址被封锁。大规模爬取必须负责任地进行。robots.txt大多数网站提供一个 /robots.txt 文件,概述了对自动化代理(机器人/爬虫)的规则。这些规则指定了网站的哪些部分不应被访问(Disallow),有时会建议一个推荐的爬取延迟。遵守 robots.txt 是道德抓取的一个基本方面。Python 的 urllib.robotparser 可以提供帮助:from urllib.robotparser import RobotFileParser from urllib.parse import urljoin # 定义爬虫的User-Agent字符串 # 尽可能具体并提供联系方式 USER_AGENT = "MyLLMDataCrawler/1.0 (+http://mycrawlerinfo.example.com)" def can_fetch_url(robot_parser, url): """检查URL是否被robots.txt允许,供我们的用户代理使用。""" try: return robot_parser.can_fetch(USER_AGENT, url) except Exception as e: # 优雅地处理潜在的解析错误 print(f"检查 {url} 的robots.txt权限时出错: {e}") return False # 不确定时默认为不抓取 # --- 爬虫内部使用示例 --- robots_url_cache = {} # 缓存每个域的解析器对象,避免重复获取 async def check_and_fetch(session, url): # 简化:假设URL是格式正确的http/https base_url = urljoin(url, '/') robots_url = urljoin(base_url, 'robots.txt') parser = robots_url_cache.get(base_url) if not parser: print(f"正在获取 {base_url} 的robots.txt") parser = RobotFileParser() parser.set_url(robots_url) try: # 在真实的异步爬虫中,这里应该使用异步请求 # 为简单起见,这里使用了read(),这在异步代码中可能会阻塞 # 在aiohttp中,您会获取robots_url,然后调用parser.parse() # await parser.read() # # 理想的异步方式(需要对解析器进行异步适配) # --- 为示例清晰起见进行同步模拟 --- import requests response = requests.get( robots_url, timeout=5, headers={'User-Agent': USER_AGENT} ) if response.status_code == 200: parser.parse(response.text.splitlines()) else: print( f"未找到 {base_url} 的有效robots.txt " f"(状态: {response.status_code})" ) # --- 同步模拟结束 --- robots_url_cache[base_url] = parser # 即使获取失败或没有规则也进行缓存 except Exception as e: print(f"获取或解析 {base_url} 的robots.txt失败: {e}") robots_url_cache[base_url] = None # 标记为失败 return None # 未能检查robots.txt,无法继续 if parser and can_fetch_url(parser, url): print(f"允许获取: {url}") # 使用aiohttp(前面定义的fetch函数)进行实际获取 return await fetch(session, url) elif parser: print(f"被robots.txt禁止: {url}") return None else: # 处理robots.txt获取早前失败的情况 print(f"无法验证 {url} 的robots.txt权限,跳过。") return None # --- 异步环境中的示例调用 --- # async with aiohttp.ClientSession( # headers={'User-Agent': USER_AGENT}) as session: # await check_and_fetch( # session, "https://example.com/some/allowed/path") # await check_and_fetch( # session, "https://example.com/some/disallowed/path") # # 假设在robots.txt中被禁止频率限制即使 robots.txt 允许,每秒向单个服务器发送数百或数千个请求也可能使其不堪重负。对每个域名实施频率限制。固定延迟: 在对同一域名的请求之间添加一个简单的 await asyncio.sleep(delay_seconds)。Crawl-delay 指令: 有些 robots.txt 文件会建议一个延迟。如果存在,请遵守。自适应延迟: 监测服务器响应时间或错误率(如 HTTP 429 “请求过多”,503 “服务不可用”)。如果出现问题,自动增加延迟。并发连接限制: 限制对单个域名/IP地址的并发连接数。一个简单的按域名延迟机制:import asyncio import time # 存储每个域名的最后访问时间 last_access_times = {} # 对同一域名请求之间的最小延迟(秒) MIN_DELAY_PER_DOMAIN = 1.0 async def rate_limited_fetch(session, url): # 需要 'from urllib.parse import urlparse' domain = urlparse(url).netloc last_access = last_access_times.get(domain, 0) now = time.monotonic() elapsed = now - last_access if elapsed < MIN_DELAY_PER_DOMAIN: wait_time = MIN_DELAY_PER_DOMAIN - elapsed print(f"对 {domain} 进行频率限制: 等待 {wait_time:.2f} 秒") await asyncio.sleep(wait_time) # 在发起请求*之前*更新最后访问时间 last_access_times[domain] = time.monotonic() # 现在执行实际的获取操作 # (使用 'fetch' 或 'check_and_fetch' 等函数) result = await fetch(session, url) # 假设 'fetch' 处理异常 return resultUser-Agent 标识始终设置清晰的 User-Agent 字符串,以标识您的爬虫,理想情况下提供联系方式(例如,带有信息或电子邮件地址的URL)。这有助于网站管理员了解流量来源,并在您的爬虫导致问题时与您联系。避免使用通用浏览器用户代理。示例:User-Agent: LLMBuilderBot/0.1 (+http://www.my-llm-project.org/crawler-info)处理规模挑战除了礼貌爬取之外,大规模抓取还会带来其他技术障碍:动态内容(JavaScript): 许多现代网站在初始HTML页面加载后使用JavaScript加载内容。简单的HTTP请求无法捕获这些数据。需要像 Playwright 或 Selenium 这样控制真实浏览器引擎的工具。然而,它们比简单的HTTP客户端资源消耗明显更高(CPU、内存),会减缓爬取速度并增加成本。仅在必要时选择性使用它们。爬虫陷阱: 网站可能有意或无意地创建“爬虫陷阱”——动态生成链接的无限循环(例如,日历中无休止的“下个月”链接),可以困住一个缺乏经验的爬虫。实施诸如每个域的最大爬取深度、URL模式过滤或监控从单个页面/域发现的URL数量等机制。会话管理与登录: 访问需要登录的内容需要处理认证(cookie、令牌),这增加了复杂性并可能违反服务条款。除非明确允许,大规模预训练数据收集通常会避免这种情况。错误处理: 网络故障、服务器错误(HTTP 5xx)、客户端错误(HTTP 4xx)和超时在大规模操作中很常见。对超时或503等瞬时错误实施重试逻辑,通常采用指数退避(每次失败后等待时间逐渐延长)。对于持久性错误(如 404 Not Found),在几次重试后放弃。构建一个可伸缩的网页抓取器是一项重要的软件工程任务。虽然库和框架提供了构建模块,但仔细的设计,考虑并发、分布式、存储、礼貌性和错误处理,对于成功收集训练大型语言模型所需的庞大数据是必要的。请记住,数据质量也十分重要;抓取的原始输出通常需要大量清洗和过滤,如第7章(“数据清洗和预处理流程”)所述。