从‘Hello World’到网络爬虫:手把手教你用Boost.Asio写一个C++并发小项目
从‘Hello World’到网络爬虫:手把手教你用Boost.Asio写一个C++并发小项目
在编程学习的道路上,从简单的"Hello World"到实际项目的跨越往往令人望而生畏。特别是当涉及到网络编程和并发处理时,许多开发者都会感到无从下手。本文将带你用Boost.Asio这个强大的C++库,一步步构建一个支持并发的HTTP客户端——这实际上就是一个简易网络爬虫的雏形。
Boost.Asio是Boost库中用于网络和底层I/O编程的跨平台C++库,它提供了一致的异步模型,能够轻松处理TCP/UDP、定时器、文件描述符等。与传统的socket编程相比,Asio的异步特性让并发网络编程变得简单而优雅。我们将从最基本的HTTP请求开始,逐步添加并发处理能力,最终完成一个可以同时处理多个请求的小型爬虫。
1. 环境准备与Boost.Asio基础
在开始我们的项目之前,确保你已经安装了Boost库。如果你使用的是Linux系统,可以通过包管理器快速安装:
sudo apt-get install libboost-all-dev
对于希望从源码编译的用户,可以参考以下精简步骤:
- 下载最新版Boost源码
- 解压并进入目录
- 运行
./bootstrap.sh - 编译并安装:
./b2 && sudo ./b2 install
提示:编译Boost可能需要较长时间,建议使用
-j参数启用多线程编译,如./b2 -j4
Boost.Asio的核心概念包括:
- io_context :I/O执行上下文,所有异步操作都需要通过它来调度
- socket :网络通信端点
- resolver :将主机名和服务名解析为端点
- 异步操作模型 :基于回调或协程的异步编程模式
让我们先创建一个最简单的同步HTTP客户端:
#include <iostream>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
void fetch_url(const std::string& host, const std::string& path) {
boost::asio::io_context io;
tcp::resolver resolver(io);
tcp::socket socket(io);
// 解析主机名和端口
auto endpoints = resolver.resolve(host, "http");
boost::asio::connect(socket, endpoints);
// 发送HTTP请求
std::string request = "GET " + path + " HTTP/1.1\r\n"
"Host: " + host + "\r\n"
"Connection: close\r\n\r\n";
boost::asio::write(socket, boost::asio::buffer(request));
// 读取响应
boost::asio::streambuf response;
boost::asio::read_until(socket, response, "\r\n");
// 输出响应头
std::istream response_stream(&response);
std::string http_version;
unsigned int status_code;
std::string status_message;
response_stream >> http_version >> status_code;
std::getline(response_stream, status_message);
std::cout << "Response: " << status_code << " " << status_message;
}
int main() {
fetch_url("www.example.com", "/");
return 0;
}
这个简单的例子展示了如何使用Boost.Asio进行基本的HTTP请求。接下来,我们将把它改造成异步版本。
2. 异步编程与回调函数
同步代码虽然简单,但在实际应用中往往会阻塞线程,无法充分利用系统资源。Boost.Asio的真正威力在于其异步编程模型。让我们重构上面的代码,使用异步操作:
#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
using boost::asio::ip::tcp;
class HTTPClient {
public:
HTTPClient(boost::asio::io_context& io, const std::string& host, const std::string& path)
: resolver_(io), socket_(io), host_(host), path_(path) {
start_resolve();
}
private:
void start_resolve() {
resolver_.async_resolve(host_, "http",
boost::bind(&HTTPClient::handle_resolve, this,
boost::asio::placeholders::error,
boost::asio::placeholders::results));
}
void handle_resolve(const boost::system::error_code& err,
const tcp::resolver::results_type& endpoints) {
if (!err) {
boost::asio::async_connect(socket_, endpoints,
boost::bind(&HTTPClient::handle_connect, this,
boost::asio::placeholders::error));
} else {
std::cerr << "Resolve error: " << err.message() << "\n";
}
}
void handle_connect(const boost::system::error_code& err) {
if (!err) {
std::string request = "GET " + path_ + " HTTP/1.1\r\n"
"Host: " + host_ + "\r\n"
"Connection: close\r\n\r\n";
boost::asio::async_write(socket_, boost::asio::buffer(request),
boost::bind(&HTTPClient::handle_write, this,
boost::asio::placeholders::error));
} else {
std::cerr << "Connect error: " << err.message() << "\n";
}
}
void handle_write(const boost::system::error_code& err) {
if (!err) {
boost::asio::async_read_until(socket_, response_, "\r\n",
boost::bind(&HTTPClient::handle_read_status, this,
boost::asio::placeholders::error));
} else {
std::cerr << "Write error: " << err.message() << "\n";
}
}
void handle_read_status(const boost::system::error_code& err) {
if (!err) {
std::istream response_stream(&response_);
std::string http_version;
unsigned int status_code;
std::string status_message;
response_stream >> http_version >> status_code;
std::getline(response_stream, status_message);
std::cout << "Response: " << status_code << " " << status_message << "\n";
// 继续读取响应体
boost::asio::async_read(socket_, response_,
boost::asio::transfer_at_least(1),
boost::bind(&HTTPClient::handle_read_content, this,
boost::asio::placeholders::error));
} else {
std::cerr << "Read error: " << err.message() << "\n";
}
}
void handle_read_content(const boost::system::error_code& err) {
if (!err) {
// 输出接收到的内容
std::cout << &response_;
// 继续读取
boost::asio::async_read(socket_, response_,
boost::asio::transfer_at_least(1),
boost::bind(&HTTPClient::handle_read_content, this,
boost::asio::placeholders::error));
} else if (err != boost::asio::error::eof) {
std::cerr << "Read error: " << err.message() << "\n";
}
}
tcp::resolver resolver_;
tcp::socket socket_;
boost::asio::streambuf response_;
std::string host_;
std::string path_;
};
int main() {
boost::asio::io_context io;
HTTPClient client(io, "www.example.com", "/");
io.run();
return 0;
}
这个异步版本虽然代码量增加了,但它不会阻塞线程,可以在单个线程中处理多个连接。每个异步操作完成后,都会调用相应的回调函数。
3. 引入线程池实现并发请求
单个异步客户端已经比同步版本高效,但为了真正发挥多核CPU的优势,我们需要引入线程池。Boost.Asio可以轻松地与线程池结合:
#include <iostream>
#include <vector>
#include <thread>
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
using boost::asio::ip::tcp;
class ConcurrentHTTPClient {
public:
ConcurrentHTTPClient(int thread_count = 4)
: work_guard_(boost::asio::make_work_guard(io_)) {
for (int i = 0; i < thread_count; ++i) {
threads_.emplace_back([this]() { io_.run(); });
}
}
~ConcurrentHTTPClient() {
io_.stop();
for (auto& t : threads_) {
if (t.joinable()) t.join();
}
}
void fetch(const std::string& host, const std::string& path) {
boost::asio::post(io_,
[this, host, path]() {
std::make_shared<HTTPClient>(io_, host, path)->start();
});
}
private:
class HTTPClient : public std::enable_shared_from_this<HTTPClient> {
public:
HTTPClient(boost::asio::io_context& io, const std::string& host, const std::string& path)
: resolver_(io), socket_(io), host_(host), path_(path) {}
void start() {
start_resolve();
}
private:
// ... 之前的HTTPClient实现代码 ...
};
boost::asio::io_context io_;
std::vector<std::thread> threads_;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard_;
};
int main() {
ConcurrentHTTPClient client;
// 并发请求多个URL
client.fetch("www.example.com", "/");
client.fetch("www.boost.org", "/doc/libs/");
client.fetch("en.cppreference.com", "/w/");
// 主线程可以继续做其他工作
std::this_thread::sleep_for(std::chrono::seconds(5));
return 0;
}
这个实现的关键点:
- io_context工作保护 :
executor_work_guard确保io_context在有工作时不会退出 - 线程池 :多个线程共享同一个io_context,并行处理异步操作
- 共享指针管理生命周期 :使用
std::enable_shared_from_this确保回调时对象仍然存在
4. 构建简易网络爬虫
现在,我们将前面的组件组合起来,构建一个简单的网络爬虫。这个爬虫将:
- 从初始URL开始
- 提取页面中的链接
- 将新链接加入队列
- 并发处理多个页面
#include <iostream>
#include <queue>
#include <unordered_set>
#include <regex>
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
using boost::asio::ip::tcp;
class SimpleCrawler {
public:
SimpleCrawler(int thread_count = 4)
: work_guard_(boost::asio::make_work_guard(io_)),
resolver_(io_) {
for (int i = 0; i < thread_count; ++i) {
threads_.emplace_back([this]() { io_.run(); });
}
}
~SimpleCrawler() {
io_.stop();
for (auto& t : threads_) {
if (t.joinable()) t.join();
}
}
void start(const std::string& initial_url) {
add_url(initial_url);
}
private:
void add_url(const std::string& url) {
static std::regex url_regex(R"(^(https?)://([^/]+)(/.*)?$)");
std::smatch match;
if (std::regex_match(url, match, url_regex) && visited_.insert(url).second) {
std::string protocol = match[1];
std::string host = match[2];
std::string path = match[3].matched ? match[3] : "/";
boost::asio::post(io_,
[this, host, path]() {
fetch_page(host, path);
});
}
}
void fetch_page(const std::string& host, const std::string& path) {
auto socket = std::make_shared<tcp::socket>(io_);
resolver_.async_resolve(host, "http",
[this, socket, host, path](const boost::system::error_code& err,
const tcp::resolver::results_type& endpoints) {
if (!err) {
boost::asio::async_connect(*socket, endpoints,
[this, socket, host, path](const boost::system::error_code& err,
const tcp::endpoint&) {
if (!err) {
std::string request = "GET " + path + " HTTP/1.1\r\n"
"Host: " + host + "\r\n"
"Connection: close\r\n\r\n";
boost::asio::async_write(*socket, boost::asio::buffer(request),
[this, socket](const boost::system::error_code& err, size_t) {
if (!err) {
auto response = std::make_shared<boost::asio::streambuf>();
boost::asio::async_read_until(*socket, *response, "\r\n",
[this, socket, response](const boost::system::error_code& err, size_t) {
if (!err) {
std::istream response_stream(response.get());
std::string http_version;
unsigned int status_code;
response_stream >> http_version >> status_code;
if (status_code == 200) {
read_response_body(socket, response);
}
}
});
}
});
}
});
}
});
}
void read_response_body(const std::shared_ptr<tcp::socket>& socket,
const std::shared_ptr<boost::asio::streambuf>& response) {
boost::asio::async_read(*socket, *response,
boost::asio::transfer_at_least(1),
[this, socket, response](const boost::system::error_code& err, size_t bytes_transferred) {
if (!err) {
std::string content(
boost::asio::buffers_begin(response->data()),
boost::asio::buffers_begin(response->data()) + bytes_transferred);
response->consume(bytes_transferred);
// 简单的链接提取
static std::regex link_regex(R"(href="(https?://[^"]+)")");
std::sregex_iterator it(content.begin(), content.end(), link_regex);
std::sregex_iterator end;
for (; it != end; ++it) {
add_url((*it)[1]);
}
read_response_body(socket, response);
} else if (err != boost::asio::error::eof) {
std::cerr << "Error: " << err.message() << "\n";
}
});
}
boost::asio::io_context io_;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work_guard_;
tcp::resolver resolver_;
std::vector<std::thread> threads_;
std::unordered_set<std::string> visited_;
};
int main() {
SimpleCrawler crawler;
crawler.start("http://www.example.com");
// 让爬虫运行一段时间
std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}
这个简易爬虫展示了如何将Boost.Asio用于实际项目。虽然功能简单,但它包含了网络爬虫的核心要素:
- 并发请求处理
- URL去重
- 链接提取
- 异步I/O操作
5. 性能优化与错误处理
我们的基本爬虫已经可以工作,但在实际应用中还需要考虑更多因素。下面是一些改进方向:
连接池管理
频繁创建和销毁TCP连接会影响性能。我们可以实现一个简单的连接池:
class ConnectionPool {
public:
ConnectionPool(boost::asio::io_context& io, const std::string& host)
: io_(io), host_(host) {}
std::shared_ptr<tcp::socket> acquire() {
if (!pool_.empty()) {
auto socket = pool_.back();
pool_.pop_back();
return socket;
}
return std::make_shared<tcp::socket>(io_);
}
void release(std::shared_ptr<tcp::socket> socket) {
if (socket->is_open()) {
pool_.push_back(socket);
}
}
private:
boost::asio::io_context& io_;
std::string host_;
std::vector<std::shared_ptr<tcp::socket>> pool_;
};
超时处理
网络请求可能会因为各种原因挂起,我们需要添加超时机制:
void fetch_with_timeout(const std::string& host, const std::string& path) {
auto socket = std::make_shared<tcp::socket>(io_);
auto timer = std::make_shared<boost::asio::steady_timer>(io_);
// 设置超时
timer->expires_after(std::chrono::seconds(5));
timer->async_wait([socket](const boost::system::error_code& ec) {
if (!ec) {
socket->close();
}
});
resolver_.async_resolve(host, "http",
[this, socket, timer, host, path](const boost::system::error_code& err,
const tcp::resolver::results_type& endpoints) {
if (err) {
timer->cancel();
return;
}
boost::asio::async_connect(*socket, endpoints,
[this, socket, timer, host, path](const boost::system::error_code& err,
const tcp::endpoint&) {
timer->cancel();
if (err) return;
// 继续处理请求...
});
});
}
速率限制
为了避免对目标服务器造成过大压力,应该实现请求速率限制:
class RateLimiter {
public:
RateLimiter(boost::asio::io_context& io, int requests_per_second)
: timer_(io), interval_(std::chrono::milliseconds(1000 / requests_per_second)),
available_(requests_per_second) {
timer_.expires_after(interval_);
timer_.async_wait([this](const boost::system::error_code&) { on_timer(); });
}
void acquire(std::function<void()> callback) {
if (available_ > 0) {
--available_;
callback();
} else {
queue_.push(callback);
}
}
private:
void on_timer() {
available_ = 1;
if (!queue_.empty()) {
--available_;
auto callback = queue_.front();
queue_.pop();
callback();
}
timer_.expires_after(interval_);
timer_.async_wait([this](const boost::system::error_code&) { on_timer(); });
}
boost::asio::steady_timer timer_;
std::chrono::milliseconds interval_;
int available_;
std::queue<std::function<void()>> queue_;
};
错误统计与重试
记录错误并实现自动重试机制:
struct ErrorStats {
std::atomic<int> connection_errors{0};
std::atomic<int> timeout_errors{0};
std::atomic<int> other_errors{0};
};
void fetch_with_retry(const std::string& host, const std::string& path,
int retry_count, std::shared_ptr<ErrorStats> stats) {
auto attempt = [this, host, path, retry_count, stats](const boost::system::error_code& err) {
if (err) {
if (err == boost::asio::error::operation_aborted) {
stats->timeout_errors++;
} else if (err.category() == boost::asio::error::get_netdb_category()) {
stats->connection_errors++;
} else {
stats->other_errors++;
}
if (retry_count > 0) {
fetch_with_retry(host, path, retry_count - 1, stats);
}
}
};
// 发起请求并将attempt作为错误回调
}
将这些改进应用到我们的爬虫中,可以显著提高其健壮性和实用性。Boost.Asio的强大之处在于,所有这些功能都可以通过组合基本的异步操作来实现,而不需要复杂的线程同步。
更多推荐
所有评论(0)