什么是liburing
liburing是一个用于异步IO库,它提供了简洁易用的API来处理文件I/O、网络I/O以及事件驱动I/O等各种I/O操作。liburing库基于Linux内核中的io_uring特性实现,将I/O请求从应用层转移到内核层以提高应用程序的I/O性能。
由于liburing在内核版本5.1才引入,所以需要运行环境的linux内核版本大于等于5.1
为什么使用liburing
liburing库对于一些高并发、高吞吐量的程序,特别是网络服务器、云存储等高性能系统的设计和实现有很大的帮助作用。
如何使用
建议参考源码里的步骤自行编译安装,源码:https://github.com/axboe/liburing
常用函数介绍:
- io_uring_queue_init:用于初始化io_uring并且返回其句柄。
- io_uring_queue_exit:用于关闭并释放io_uring的句柄。
- io_uring_get_sqe:用于获取一个可用的sqe,即I/O请求对应的队列元素数据结构。
- io_uring_prep_readv:用于准备一个异步读请求。
- io_uring_prep_writev:用于准备一个异步写请求。
- io_uring_sqe_set_data:用于将用户私有数据关联到一个sqe(请求)中。
- io_uring_submit:用于提交一个或一批异步IO请求到io_uring。
- io_uring_peek_cqe:用于查看完成队列(cq)中的未处理项数量。
- io_uring_wait_cqe:阻塞等待一个处理完成的io。
- io_uring_cqe_get_data:用于获取特定的完成队列项(cqe),其中包含先前提交的IO请求的结果以及相关的私有数据。
- io_uring_cqe_seen:用于标记一个完成队列项(cqe)已被处理过。
下面使用liburing封装一个DiskUtil实现对文件的基本读写
编写disk_util.h头文件
#include <functional>
#include <string>
#include <liburing.h>
#include <thread>
enum IO_OP { OP_READ = 0, OP_WRITE = 1 };
struct IORequest {
IO_OP opcode; // 0: read, 1: write
char* buffer;
off_t offset;
size_t length;
std::function<void(int)> callback;
};
class DiskUtil {
public:
DiskUtil(const std::string& file_path, int block_size);
~DiskUtil();
void submit_request(IORequest* req);
void start();
void stop();
private:
void io_worker_thread();
int open_file();
void close_file();
void process_io_request(IORequest* req);
int submit_io_request(IORequest* req);
void complete_io_request();
private:
int fd_ = -1;
io_uring io_ring_;
const std::string file_path_;
const int block_size_;
std::thread io_worker_;
bool is_running_ = false;
};
编写disk_tool.cpp
#include <vector>
#include <string>
#include <fcntl.h>
#include <unistd.h>
#include <cstring>
#include <stdexcept>
#include <iostream>
#include <liburing.h>
#include "disk_util.h"
DiskUtil::DiskUtil(const std::string& file_path, int block_size)
: file_path_(file_path), block_size_(block_size) {
fd_ = open_file();
if (fd_ < 0) {
throw std::runtime_error("Failed to open file");
}
int ret = io_uring_queue_init(128, &io_ring_, 0);
if (ret < 0) {
close(fd_);
throw std::runtime_error("Failed to setup io ring, ret:" + std::to_string(ret));
}
is_running_ = true;
io_worker_ = std::thread(std::bind(&DiskUtil::io_worker_thread, this));
}
DiskUtil::~DiskUtil() {
stop();
io_uring_queue_exit(&io_ring_);
close_file();
}
void DiskUtil::submit_request(IORequest* req) {
submit_io_request(req);
}
void DiskUtil::start() {
is_running_ = true;
}
void DiskUtil::stop() {
if (is_running_) {
is_running_ = false;
if (io_worker_.joinable()) {
io_worker_.join();
}
}
}
int DiskUtil::open_file() {
int flags = O_RDWR | O_DIRECT | O_CREAT;
int mode = S_IRUSR | S_IWUSR;
return open(file_path_.c_str(), flags);
}
void DiskUtil::close_file() {
if (fd_ >= 0) {
close(fd_);
}
}
int DiskUtil::submit_io_request(IORequest* req) {
struct io_uring_sqe* sqe = io_uring_get_sqe(&io_ring_);
if (!sqe) {
return -1;
}
struct iovec iov;
iov.iov_base = req->buffer;
iov.iov_len = req->length;
if (req->opcode == OP_READ) {
io_uring_prep_readv(sqe, fd_, &iov, 1, req->offset);
} else if (req->opcode == OP_WRITE) {
io_uring_prep_writev(sqe, fd_, &iov, 1, req->offset);
}
io_uring_sqe_set_data(sqe, (void*)req);
io_uring_submit(&io_ring_);
return 0;
}
void DiskUtil::io_worker_thread() {
while (is_running_) {
struct io_uring_cqe* cqe = nullptr;
int ret = io_uring_wait_cqe(&io_ring_, &cqe);
if (ret < 0) {
std::cout << "io_uring_wait_cqe error:" << ret << std::endl;
return;
}
std::cout << "ret:" << ret << std::endl;
if (cqe->res < 0) {
std::cerr << "IO error: " << std::strerror(-cqe->res) << " ret_code:" << cqe->res << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
cqe = nullptr;
continue;
} else {
IORequest* req = (IORequest*)io_uring_cqe_get_data(cqe);
if (req->callback) {
req->callback(0);
}
}
io_uring_cqe_seen(&io_ring_, cqe);
}
}
下面是调用的例子
#include <cstring>
#include <cstdlib>
#include <iostream>
#include "disk_util.h"
#define BLOCK_SIZE 4096
void test_read(DiskUtil& disk, off_t offset, size_t length) {
char* buffer = nullptr;
int ret = posix_memalign((void**)&buffer, 4096, length);
if (ret < 0) {
std::cout << "malloc buffer error";
return;
}
IORequest* req = new IORequest;
req->opcode = OP_READ;
req->buffer = buffer;
req->offset = offset;
req->length = length;
req->callback = [req](int ret) {
if (ret == 0) {
std::cout << "Read complete, data:" << req->buffer << std::endl;
} else {
std::cout << "Read failed: " << ret << std::endl;
}
free(req->buffer);
delete req;
};
disk.submit_request(req);
}
void test_write(DiskUtil& disk, off_t offset, size_t length, const char* data) {
char* buffer = nullptr;
int ret = posix_memalign((void**)&buffer, 4096, length);
if (ret < 0) {
std::cout << "malloc buffer error";
return;
}
std::memcpy(buffer, data, length);
IORequest* req = new IORequest();
req->opcode = OP_WRITE;
req->buffer = buffer;
req->offset = offset;
req->length = length;
req->callback = [req](int ret) {
if (ret == 0) {
std::cout << "Write complete" << std::endl;
} else {
std::cout << "Write failed: " << ret << std::endl;
}
free(req->buffer);
delete req;
};
disk.submit_request(req);
}
int main() {
DiskUtil disk("test.disk", BLOCK_SIZE);
disk.start();
size_t length = 4096;
const char* data = "Hello world";
test_write(disk, 0, length, data);
std::this_thread::sleep_for(std::chrono::seconds(1));
test_read(disk, 0, length);
// std::system("rm test.disk");
disk.stop();
return 0;
}
注意点:
- 与libaio不同的是,liburing并不强制上层传递的buffer是4K对齐的,但是为了获取最佳性能,建议还是使用4K对齐的buffer。
- 实测在内核5.4版本上使用io_uring_prep_read和io_uring_prep_write方法会在调用io_uring_wait_cqe判断cqe->res的大小时报invalid argument,原因未知
完整代码参见github链接:https://github.com/lambertxiao/storage/tree/master/liburing
为什么liburing性能高
- 零拷贝:在传统的系统调用中,数据需要在用户空间和内核空间进行多次内存拷贝,而io_uring可以通过内存映射来实现零拷贝,将数据从内核的缓存区直接传输到用户空间,从而减少了内存拷贝带来的性能开销。
- 批处理I/O操作:io_uring使用批处理机制来减少系统调用的次数,可以将多个I/O操作打包成一个请求提交给内核进行处理,从而提高了系统吞吐量。
- 高效内存管理:io_uring在内核中使用了自身的内存池机制,能够更有效地管理内部的内存,避免了频繁的分配和释放内存所带来的性能损失。
- 锁控制:io_uring使用了内核层面的锁机制,避免了多线程I/O操作时的竞争情况,提高了系统的并发能力。
- io_uring的底层实现使用了ring buffer方式,可以根据需要进行高效的扩展,能够更好地应对高负载工作环境