#include "reactor.hpp" #include "fd_event.hpp" #include "logger.hpp" // #include "coroutine_hook.hpp" #include #include #include #include #include #include #include #include namespace tinyrpc { // extern read_fun_ptr_t g_sys_read_fun; // extern write_fun_ptr_t g_sys_write_fun; static const int EPOLL_EVENT_MAX_LEN = 16; static thread_local Reactor *t_reactor = nullptr; Reactor* Reactor::getReactor() { if(t_reactor == nullptr) { return t_reactor = new Reactor(); } return t_reactor; } Reactor::Reactor(ReactorType type) { if(t_reactor != nullptr) { logger() << "this thread has already create a reactor"; exit(-1); } m_tid = gettid(); m_epfd = epoll_create(5); if (m_epfd == -1) { logger() << "epoll create error!"; exit(-1); } m_rousefd = eventfd(0, EFD_NONBLOCK); if (m_epfd == -1) { logger() << "eventfd create error!"; exit(-1); } // FdEvent(m_rousefd).setNonblock(); addRouseFd(m_rousefd); t_reactor = this; } void Reactor::addRouseFd(int eventFd) { // if (m_listen_fd_events.count(eventFd)) { // logger() << "the fd already exist"; // return; // } epoll_event ev; ev.events = EPOLLIN; ev.data.fd = eventFd; addFd(eventFd, ev); } bool Reactor::addFd(int fd, epoll_event ev) { int ret = epoll_ctl(m_epfd, EPOLL_CTL_ADD, fd, &ev); if(ret == -1) { logger() << "epoll_ctl add ret -1 err:" << strerror(errno); return false; } return true; } bool Reactor::delFd(int fd) { epoll_event ev{}; int ret = epoll_ctl(m_epfd, EPOLL_CTL_DEL, fd, &ev); if(ret == -1) { logger() << "epoll_ctl del ret -1 err:" << strerror(errno); return false; } return true; } bool Reactor::modFd(int fd, epoll_event ev) { int ret = epoll_ctl(m_epfd, EPOLL_CTL_MOD, fd, &ev); if(ret == -1) { logger() << "epoll_ctl mod ret -1 err:" << strerror(errno); return false; } return true; } void Reactor::processAllTasks() { std::vector tmpTasks; { std::lock_guard lock(m_tasks_mtx); tmpTasks.swap(m_tasks); } for(Task task : tmpTasks) { task(); } if(!tmpTasks.empty()) { rouse(); } } void Reactor::loop() { if(m_is_looping) { logger() << "The reactor is already looping"; return; } epoll_event events[EPOLL_EVENT_MAX_LEN]{}; m_is_looping = true; while(!m_is_stop) { processAllTasks(); logger() << "before epoll_wait"; int num = epoll_wait(m_epfd, events, EPOLL_EVENT_MAX_LEN, -1); logger() << "wakeup"; if(num < 0) { logger() << "epoll_wait ret -1 err:" << strerror(errno); continue; } for(int i = 0; i < num; i++) { int curFd = events[i].data.fd; if(curFd == m_rousefd) { eventfd_t val = 0; if(eventfd_read(curFd, &val) == -1) { logger() << "eventfd_read ret -1 err:" << strerror(errno); } continue; } if(m_listen_fd_events.count(curFd) == 0) { logger() << "unknow fd:" << curFd <<", skip"; continue; } if(events[i].events & EPOLLIN) { Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::READ); { std::lock_guard lock(m_tasks_mtx); m_tasks.push_back(cb); } } if(events[i].events & EPOLLOUT) { Task cb = m_listen_fd_events[curFd]->getHandler(IOEvent::WRITE); { std::lock_guard lock(m_tasks_mtx); m_tasks.push_back(cb); } } } } m_is_looping = false; } void Reactor::rouse() { // logger() << "rouse call"; eventfd_t val = 1; eventfd_write(m_rousefd, val); } void Reactor::stop() { if(!m_is_looping) { return; } m_is_stop = false; rouse(); } void Reactor::addFdEvent( FdEvent* fdEvent) { assert(fdEvent); if (m_listen_fd_events.count(fdEvent->getFd())) { logger() << "the fd already exist"; return; } Task task = [this, &fdEvent]{ int fd = fdEvent->getFd(); int event = fdEvent->getEvent(); epoll_event ev; ev.events = event; ev.data.fd = fd; addFd(fd, ev); m_listen_fd_events.insert({fd, fdEvent}); }; if(this == t_reactor) { // 如果是同一个线程直接执行 task(); } else { std::lock_guard lock(m_tasks_mtx); m_tasks.push_back(task); } rouse(); } void Reactor::delFdEvent(FdEvent* fdEvent) { assert(fdEvent); if (m_listen_fd_events.count(fdEvent->getFd()) == 0) { logger() << "the fd is not exist"; return ; } Task task = [this, &fdEvent] { int fd = fdEvent->getFd(); delFd(fd); m_listen_fd_events.erase(fd); }; if(this == t_reactor) { // 如果是同一个线程直接执行 task(); } else { std::lock_guard lock(m_tasks_mtx); m_tasks.push_back(task); } rouse(); } void Reactor::modFdEvent( FdEvent* fdEvent) { assert(fdEvent); if (m_listen_fd_events.count(fdEvent->getFd()) == 0) { logger() << "the fd is not exist"; return ; } Task task = [this, &fdEvent] { int fd = fdEvent->getFd(); int event = fdEvent->getEvent(); epoll_event ev; ev.events = event; ev.data.fd = fd; modFd(fd, ev); }; if(this == t_reactor) { // 如果是同一个线程直接执行 task(); } else { std::lock_guard lock(m_tasks_mtx); m_tasks.push_back(task); } rouse(); } Reactor::~Reactor() { m_is_stop = true; // rouse(); close(m_epfd); close(m_rousefd); t_reactor = nullptr; } }