tinyrpc/src/net/reactor.cc

265 lines
6.9 KiB
C++
Raw Normal View History

2024-12-20 21:17:21 +08:00
#include "reactor.hpp"
#include "fd_event.hpp"
#include "logger.hpp"
#include "coroutine_hook.hpp"
#include <cassert>
#include <cstring>
#include <fcntl.h>
#include <mutex>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/eventfd.h>
#include <vector>
namespace tinyrpc {
extern read_fun_ptr_t g_sys_read_fun;
extern write_fun_ptr_t g_sys_write_fun;
static const int EPOLL_EVENT_MAX_LEN = 16;
static thread_local Reactor *t_reactor = nullptr;
Reactor::Reactor()
{
if(t_reactor != nullptr) {
logger() << "this thread has already create a reactor";
exit(-1);
}
m_tid = gettid();
m_epfd = epoll_create(5);
if (m_epfd == -1) {
logger() << "epoll create error!";
exit(-1);
}
m_rousefd = eventfd(0, EFD_NONBLOCK);
if (m_epfd == -1) {
logger() << "eventfd create error!";
exit(-1);
}
// FdEvent(m_rousefd).setNonblock();
addRouseFd(m_rousefd);
t_reactor = this;
}
void Reactor::addRouseFd(int eventFd) {
// if (m_listen_fd_events.count(eventFd)) {
// logger() << "the fd already exist";
// return;
// }
epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = eventFd;
addFd(eventFd, ev);
}
bool Reactor::addFd(int fd, epoll_event ev) {
int ret = epoll_ctl(m_epfd, EPOLL_CTL_ADD, fd, &ev);
if(ret == -1) {
logger() << "epoll_ctl add ret -1 err:" << strerror(errno);
return false;
}
return true;
}
bool Reactor::delFd(int fd) {
epoll_event ev{};
int ret = epoll_ctl(m_epfd, EPOLL_CTL_DEL, fd, &ev);
if(ret == -1) {
logger() << "epoll_ctl del ret -1 err:" << strerror(errno);
return false;
}
return true;
}
bool Reactor::modFd(int fd, epoll_event ev) {
int ret = epoll_ctl(m_epfd, EPOLL_CTL_MOD, fd, &ev);
if(ret == -1) {
logger() << "epoll_ctl mod ret -1 err:" << strerror(errno);
return false;
}
return true;
}
void Reactor::processAllTasks() {
std::vector<Task> tmpTasks;
{
std::lock_guard<std::mutex> lock(m_tasks_mtx);
tmpTasks.swap(m_tasks);
}
for(Task task : tmpTasks) {
task();
}
if(!tmpTasks.empty()) {
rouse();
}
}
void Reactor::loop() {
if(m_is_looping) {
logger() << "The reactor is already looping";
return;
}
epoll_event events[EPOLL_EVENT_MAX_LEN]{};
m_is_looping = true;
while(!m_is_stop) {
processAllTasks();
logger() << "before epoll_wait";
int num = epoll_wait(m_epfd, events, EPOLL_EVENT_MAX_LEN, -1);
logger() << "wakeup";
if(num < 0) {
logger() << "epoll_wait ret -1 err:" << strerror(errno);
continue;
}
for(int i = 0; i < num; i++) {
int curFd = events[i].data.fd;
if(curFd == m_rousefd) {
eventfd_t val = 0;
if(eventfd_read(curFd, &val) == -1) {
logger() << "eventfd_read ret -1 err:" << strerror(errno);
}
continue;
}
if(m_listen_fd_events.count(curFd) == 0) {
logger() << "unknow fd:" << curFd <<", skip";
continue;
}
if(events[i].events & EPOLLIN) {
Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::READ);
{
std::lock_guard<std::mutex> lock(m_tasks_mtx);
m_tasks.push_back(cb);
}
}
if(events[i].events & EPOLLOUT) {
Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::WRITE);
{
std::lock_guard<std::mutex> lock(m_tasks_mtx);
m_tasks.push_back(cb);
}
}
}
}
m_is_looping = false;
}
void Reactor::rouse() {
// logger() << "rouse call";
eventfd_t val = 1;
eventfd_write(m_rousefd, val);
}
void Reactor::stop() {
if(!m_is_looping) {
return;
}
m_is_stop = false;
rouse();
}
void Reactor::addFdEvent( FdEvent* fdEvent) {
assert(fdEvent);
if (m_listen_fd_events.count(fdEvent->getFd())) {
logger() << "the fd already exist";
return;
}
Task task = [this, &fdEvent]{
int fd = fdEvent->getFd();
int event = fdEvent->getEvent();
epoll_event ev;
ev.events = event;
ev.data.fd = fd;
addFd(fd, ev);
m_listen_fd_events.insert({fd, fdEvent});
};
if(this == t_reactor) { // 如果是同一个线程直接执行
task();
} else {
std::lock_guard<std::mutex> lock(m_tasks_mtx);
m_tasks.push_back(task);
}
rouse();
}
void Reactor::delFdEvent(FdEvent* fdEvent) {
assert(fdEvent);
if (m_listen_fd_events.count(fdEvent->getFd()) == 0) {
logger() << "the fd is not exist";
return ;
}
Task task = [this, &fdEvent] {
int fd = fdEvent->getFd();
delFd(fd);
m_listen_fd_events.erase(fd);
};
if(this == t_reactor) { // 如果是同一个线程直接执行
task();
} else {
std::lock_guard<std::mutex> lock(m_tasks_mtx);
m_tasks.push_back(task);
}
rouse();
}
void Reactor::modFdEvent( FdEvent* fdEvent) {
assert(fdEvent);
if (m_listen_fd_events.count(fdEvent->getFd()) == 0) {
logger() << "the fd is not exist";
return ;
}
Task task = [this, &fdEvent] {
int fd = fdEvent->getFd();
int event = fdEvent->getEvent();
epoll_event ev;
ev.events = event;
ev.data.fd = fd;
modFd(fd, ev);
};
if(this == t_reactor) { // 如果是同一个线程直接执行
task();
} else {
std::lock_guard<std::mutex> lock(m_tasks_mtx);
m_tasks.push_back(task);
}
rouse();
}
Reactor::~Reactor()
{
m_is_stop = true;
// rouse();
close(m_epfd);
close(m_rousefd);
t_reactor = nullptr;
}
}