2024-12-20 21:17:21 +08:00
|
|
|
#include "reactor.hpp"
|
|
|
|
#include "fd_event.hpp"
|
|
|
|
#include "logger.hpp"
|
2024-12-25 19:40:27 +08:00
|
|
|
// #include "coroutine_hook.hpp"
|
2024-12-20 21:17:21 +08:00
|
|
|
#include <cassert>
|
|
|
|
#include <cstring>
|
|
|
|
#include <fcntl.h>
|
|
|
|
#include <mutex>
|
|
|
|
#include <sys/epoll.h>
|
|
|
|
#include <unistd.h>
|
|
|
|
#include <sys/eventfd.h>
|
|
|
|
#include <vector>
|
|
|
|
namespace tinyrpc {
|
|
|
|
|
2024-12-25 19:40:27 +08:00
|
|
|
// extern read_fun_ptr_t g_sys_read_fun;
|
|
|
|
// extern write_fun_ptr_t g_sys_write_fun;
|
2024-12-20 21:17:21 +08:00
|
|
|
|
|
|
|
static const int EPOLL_EVENT_MAX_LEN = 16;
|
|
|
|
static thread_local Reactor *t_reactor = nullptr;
|
2024-12-25 19:40:27 +08:00
|
|
|
|
|
|
|
|
|
|
|
Reactor* Reactor::getReactor() {
|
|
|
|
if(t_reactor == nullptr) {
|
|
|
|
return t_reactor = new Reactor();
|
|
|
|
}
|
|
|
|
return t_reactor;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2025-01-10 15:00:50 +08:00
|
|
|
Reactor::Reactor(ReactorType type) : m_type(type)
|
2024-12-20 21:17:21 +08:00
|
|
|
{
|
|
|
|
if(t_reactor != nullptr) {
|
|
|
|
logger() << "this thread has already create a reactor";
|
|
|
|
exit(-1);
|
|
|
|
}
|
|
|
|
m_tid = gettid();
|
|
|
|
m_epfd = epoll_create(5);
|
|
|
|
if (m_epfd == -1) {
|
|
|
|
logger() << "epoll create error!";
|
|
|
|
exit(-1);
|
|
|
|
}
|
|
|
|
m_rousefd = eventfd(0, EFD_NONBLOCK);
|
|
|
|
if (m_epfd == -1) {
|
|
|
|
logger() << "eventfd create error!";
|
|
|
|
exit(-1);
|
|
|
|
}
|
|
|
|
// FdEvent(m_rousefd).setNonblock();
|
|
|
|
addRouseFd(m_rousefd);
|
|
|
|
t_reactor = this;
|
|
|
|
}
|
|
|
|
|
|
|
|
void Reactor::addRouseFd(int eventFd) {
|
|
|
|
// if (m_listen_fd_events.count(eventFd)) {
|
|
|
|
// logger() << "the fd already exist";
|
|
|
|
// return;
|
|
|
|
// }
|
|
|
|
epoll_event ev;
|
|
|
|
ev.events = EPOLLIN;
|
|
|
|
ev.data.fd = eventFd;
|
|
|
|
addFd(eventFd, ev);
|
|
|
|
}
|
|
|
|
|
|
|
|
bool Reactor::addFd(int fd, epoll_event ev) {
|
|
|
|
|
|
|
|
int ret = epoll_ctl(m_epfd, EPOLL_CTL_ADD, fd, &ev);
|
|
|
|
if(ret == -1) {
|
|
|
|
logger() << "epoll_ctl add ret -1 err:" << strerror(errno);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
bool Reactor::delFd(int fd) {
|
|
|
|
|
|
|
|
epoll_event ev{};
|
|
|
|
int ret = epoll_ctl(m_epfd, EPOLL_CTL_DEL, fd, &ev);
|
|
|
|
if(ret == -1) {
|
|
|
|
logger() << "epoll_ctl del ret -1 err:" << strerror(errno);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
bool Reactor::modFd(int fd, epoll_event ev) {
|
|
|
|
|
|
|
|
int ret = epoll_ctl(m_epfd, EPOLL_CTL_MOD, fd, &ev);
|
|
|
|
if(ret == -1) {
|
|
|
|
logger() << "epoll_ctl mod ret -1 err:" << strerror(errno);
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
void Reactor::processAllTasks() {
|
|
|
|
std::vector<Task> tmpTasks;
|
|
|
|
|
|
|
|
{
|
|
|
|
std::lock_guard<std::mutex> lock(m_tasks_mtx);
|
|
|
|
tmpTasks.swap(m_tasks);
|
|
|
|
}
|
|
|
|
|
|
|
|
for(Task task : tmpTasks) {
|
|
|
|
task();
|
|
|
|
}
|
2024-12-25 19:40:27 +08:00
|
|
|
|
2024-12-20 21:17:21 +08:00
|
|
|
if(!tmpTasks.empty()) {
|
|
|
|
rouse();
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
void Reactor::loop() {
|
|
|
|
|
|
|
|
if(m_is_looping) {
|
|
|
|
logger() << "The reactor is already looping";
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
epoll_event events[EPOLL_EVENT_MAX_LEN]{};
|
|
|
|
m_is_looping = true;
|
|
|
|
while(!m_is_stop) {
|
|
|
|
|
|
|
|
processAllTasks();
|
|
|
|
|
2025-01-10 15:00:50 +08:00
|
|
|
logger() << (m_type == Sub ? "sub " : "main ") <<"before epoll_wait";
|
2024-12-20 21:17:21 +08:00
|
|
|
int num = epoll_wait(m_epfd, events, EPOLL_EVENT_MAX_LEN, -1);
|
2025-01-10 15:00:50 +08:00
|
|
|
logger() << (m_type == Sub ? "sub " : "main ") << "wakeup";
|
2024-12-20 21:17:21 +08:00
|
|
|
|
|
|
|
if(num < 0) {
|
|
|
|
logger() << "epoll_wait ret -1 err:" << strerror(errno);
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
for(int i = 0; i < num; i++) {
|
|
|
|
int curFd = events[i].data.fd;
|
|
|
|
|
|
|
|
if(curFd == m_rousefd) {
|
|
|
|
|
|
|
|
eventfd_t val = 0;
|
|
|
|
if(eventfd_read(curFd, &val) == -1) {
|
|
|
|
logger() << "eventfd_read ret -1 err:" << strerror(errno);
|
|
|
|
}
|
|
|
|
continue;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if(m_listen_fd_events.count(curFd) == 0) {
|
|
|
|
logger() << "unknow fd:" << curFd <<", skip";
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
if(events[i].events & EPOLLIN) {
|
|
|
|
Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::READ);
|
2025-01-10 15:00:50 +08:00
|
|
|
addTask(cb);
|
2024-12-20 21:17:21 +08:00
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
if(events[i].events & EPOLLOUT) {
|
|
|
|
Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::WRITE);
|
2025-01-10 15:00:50 +08:00
|
|
|
addTask(cb);
|
2024-12-20 21:17:21 +08:00
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
m_is_looping = false;
|
|
|
|
}
|
|
|
|
|
|
|
|
void Reactor::rouse() {
|
|
|
|
// logger() << "rouse call";
|
|
|
|
eventfd_t val = 1;
|
|
|
|
eventfd_write(m_rousefd, val);
|
|
|
|
}
|
|
|
|
|
|
|
|
void Reactor::stop() {
|
|
|
|
if(!m_is_looping) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
m_is_stop = false;
|
|
|
|
rouse();
|
|
|
|
}
|
2025-01-14 15:27:15 +08:00
|
|
|
void Reactor::addFdEvent(FdEvent* fdEvent) {
|
2024-12-20 21:17:21 +08:00
|
|
|
assert(fdEvent);
|
|
|
|
if (m_listen_fd_events.count(fdEvent->getFd())) {
|
|
|
|
logger() << "the fd already exist";
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
Task task = [this, &fdEvent]{
|
|
|
|
int fd = fdEvent->getFd();
|
|
|
|
int event = fdEvent->getEvent();
|
|
|
|
epoll_event ev;
|
|
|
|
ev.events = event;
|
|
|
|
ev.data.fd = fd;
|
|
|
|
addFd(fd, ev);
|
|
|
|
m_listen_fd_events.insert({fd, fdEvent});
|
|
|
|
};
|
|
|
|
|
|
|
|
if(this == t_reactor) { // 如果是同一个线程直接执行
|
|
|
|
task();
|
|
|
|
} else {
|
|
|
|
|
2025-01-10 15:00:50 +08:00
|
|
|
addTask(task);
|
2024-12-20 21:17:21 +08:00
|
|
|
|
|
|
|
}
|
|
|
|
rouse();
|
|
|
|
}
|
|
|
|
|
|
|
|
void Reactor::delFdEvent(FdEvent* fdEvent) {
|
|
|
|
assert(fdEvent);
|
|
|
|
if (m_listen_fd_events.count(fdEvent->getFd()) == 0) {
|
|
|
|
logger() << "the fd is not exist";
|
|
|
|
return ;
|
|
|
|
}
|
|
|
|
Task task = [this, &fdEvent] {
|
|
|
|
int fd = fdEvent->getFd();
|
|
|
|
delFd(fd);
|
|
|
|
m_listen_fd_events.erase(fd);
|
|
|
|
};
|
|
|
|
if(this == t_reactor) { // 如果是同一个线程直接执行
|
|
|
|
task();
|
|
|
|
} else {
|
|
|
|
|
2025-01-10 15:00:50 +08:00
|
|
|
addTask(task);
|
2024-12-20 21:17:21 +08:00
|
|
|
|
|
|
|
}
|
|
|
|
rouse();
|
|
|
|
}
|
|
|
|
|
2025-01-14 15:27:15 +08:00
|
|
|
void Reactor::modFdEvent(FdEvent* fdEvent) {
|
2024-12-20 21:17:21 +08:00
|
|
|
assert(fdEvent);
|
|
|
|
if (m_listen_fd_events.count(fdEvent->getFd()) == 0) {
|
|
|
|
logger() << "the fd is not exist";
|
2025-01-14 15:27:15 +08:00
|
|
|
addFdEvent(fdEvent);
|
2024-12-20 21:17:21 +08:00
|
|
|
return ;
|
|
|
|
}
|
|
|
|
|
|
|
|
Task task = [this, &fdEvent] {
|
|
|
|
int fd = fdEvent->getFd();
|
|
|
|
int event = fdEvent->getEvent();
|
|
|
|
epoll_event ev;
|
|
|
|
ev.events = event;
|
|
|
|
ev.data.fd = fd;
|
|
|
|
modFd(fd, ev);
|
|
|
|
};
|
|
|
|
|
|
|
|
if(this == t_reactor) { // 如果是同一个线程直接执行
|
|
|
|
task();
|
|
|
|
} else {
|
|
|
|
|
2025-01-10 15:00:50 +08:00
|
|
|
addTask(task);
|
2024-12-20 21:17:21 +08:00
|
|
|
|
|
|
|
}
|
|
|
|
rouse();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2025-01-10 15:00:50 +08:00
|
|
|
void Reactor::addTask(Task task, bool needRouse/* = false */) {
|
|
|
|
{
|
|
|
|
std::lock_guard<std::mutex> lock(m_tasks_mtx);
|
|
|
|
m_tasks.push_back(task);
|
|
|
|
}
|
|
|
|
if(needRouse)
|
|
|
|
rouse();
|
|
|
|
}
|
|
|
|
|
2024-12-20 21:17:21 +08:00
|
|
|
Reactor::~Reactor()
|
|
|
|
{
|
|
|
|
m_is_stop = true;
|
|
|
|
// rouse();
|
|
|
|
close(m_epfd);
|
|
|
|
close(m_rousefd);
|
|
|
|
t_reactor = nullptr;
|
|
|
|
}
|
|
|
|
}
|