173 lines
5.5 KiB
C++
173 lines
5.5 KiB
C++
|
// #include "tcp_connection.hpp"
|
||
|
// #include "abstract_coder.hpp"
|
||
|
// #include "coroutine_hook.hpp"
|
||
|
// #include "fd_event.hpp"
|
||
|
// #include "logger.hpp"
|
||
|
// #include "reactor.hpp"
|
||
|
// #include "tcp_client.hpp"
|
||
|
// #include "tcp_server.hpp"
|
||
|
// #include "tinypb_data.hpp"
|
||
|
// #include <cerrno>
|
||
|
// #include <cstring>
|
||
|
// #include <memory>
|
||
|
// #include <pthread.h>
|
||
|
// #include <unistd.h>
|
||
|
|
||
|
|
||
|
// namespace tinyrpc {
|
||
|
// TcpConnection::TcpConnection(int fd, Reactor& reactor, TcpServer& ser, Type type) :
|
||
|
// m_fdEvent(FdEventPool::getInstance()->getFdEvent(fd)),
|
||
|
// m_mainCoroutine(std::bind(&TcpConnection::mainLoopFun, this)),
|
||
|
// m_reactor(reactor),
|
||
|
// m_server(ser),
|
||
|
// m_connectType(type)
|
||
|
// {
|
||
|
// Reactor::Task task = [this] {
|
||
|
// logger() << "conn coroutine is resume";
|
||
|
// m_mainCoroutine.resume();
|
||
|
// };
|
||
|
|
||
|
// reactor.addTask(task, true);
|
||
|
|
||
|
// }
|
||
|
|
||
|
|
||
|
|
||
|
// void TcpConnection::mainLoopFun() {
|
||
|
// while(m_state == State::Connected) {
|
||
|
// input();
|
||
|
// process();
|
||
|
// output();
|
||
|
// }
|
||
|
// logger() << "this conn loop has already break";
|
||
|
// }
|
||
|
|
||
|
// void TcpConnection::clearClient() {
|
||
|
// logger() << "clearClient";
|
||
|
// m_state = State::Disconnected;
|
||
|
// m_reactor.delFdEvent(m_fdEvent);
|
||
|
// m_fdEvent->reset();
|
||
|
// close(m_fdEvent->getFd());
|
||
|
// }
|
||
|
|
||
|
|
||
|
// void TcpConnection::input() { // 调用 read_hook 读数据到应用层缓冲区
|
||
|
// logger() << "input";
|
||
|
// if(m_state == State::Disconnected) {
|
||
|
// logger() << "input: this conn has already break";
|
||
|
// return;
|
||
|
// }
|
||
|
|
||
|
|
||
|
// while(true) {
|
||
|
// if(m_readBuffer.getWriteable() == 0) {
|
||
|
// m_readBuffer.dilatation();
|
||
|
// }
|
||
|
|
||
|
// int ret = read_hook(m_fdEvent->getFd(), m_readBuffer.getWriteAddress(), m_readBuffer.getWriteable());
|
||
|
// if(ret == -1) {
|
||
|
// logger() << "read_hook ret -1 err:" << strerror(errno);
|
||
|
// // if(errno == EAGAIN) exit(-1);
|
||
|
// break;
|
||
|
|
||
|
// } else if(ret == 0) { // 对端关闭了连接
|
||
|
// clearClient();
|
||
|
// break;
|
||
|
// } else {
|
||
|
// int writeable = m_readBuffer.getWriteable();
|
||
|
// m_readBuffer.writeOffset(ret);
|
||
|
// logger() << "input_hook ret: " << ret;
|
||
|
|
||
|
// if(ret < writeable) { // 读完了结束循环
|
||
|
// break;
|
||
|
// }
|
||
|
|
||
|
// }
|
||
|
|
||
|
// }
|
||
|
|
||
|
|
||
|
// }
|
||
|
// void TcpConnection::output() {
|
||
|
// logger() << "output";
|
||
|
// if(m_state == State::Disconnected) {
|
||
|
// return;
|
||
|
// }
|
||
|
|
||
|
// while(true) {
|
||
|
// if(m_writeBuffer.getReadable() == 0) {
|
||
|
// logger() << "no data need send";
|
||
|
// break;
|
||
|
// }
|
||
|
|
||
|
// int ret = write_hook(m_fdEvent->getFd(), m_writeBuffer.getReadAddress(), m_writeBuffer.getReadable());
|
||
|
// if(ret == -1) {
|
||
|
// logger() << "read_hook ret -1 err:" << strerror(errno);
|
||
|
// break;
|
||
|
|
||
|
// } else if(ret == 0) {
|
||
|
// logger() << "write_hook ret 0";
|
||
|
// break;
|
||
|
// } else {
|
||
|
// m_writeBuffer.readOffset(ret);
|
||
|
// }
|
||
|
// logger() << "write_hook ret: " << ret;
|
||
|
// }
|
||
|
|
||
|
|
||
|
// }
|
||
|
|
||
|
// void TcpConnection::process() {
|
||
|
// logger() << "process";
|
||
|
|
||
|
// // if(m_state == State::Disconnected) {
|
||
|
// // return;
|
||
|
// // }
|
||
|
// // if(m_writeBuffer.getWriteable() < m_readBuffer.getReadable()) {
|
||
|
// // m_writeBuffer.resize(m_readBuffer.getReadable() * 2);
|
||
|
// // }
|
||
|
// // std::memcpy(m_writeBuffer.getWriteAddress(), m_readBuffer.getReadAddress(), m_readBuffer.getReadable());
|
||
|
// // m_writeBuffer.writeOffset(m_readBuffer.getReadable());
|
||
|
// // m_readBuffer.readOffset(m_readBuffer.getReadable());
|
||
|
|
||
|
// // logger() << "write data " << m_writeBuffer.getReadable() << " byte";
|
||
|
|
||
|
// while(m_readBuffer.getReadable() > 0) {
|
||
|
// std::shared_ptr<AbstractData> data(new TinypbData);
|
||
|
|
||
|
// bool ret = m_server.getCoder().decoder(m_readBuffer, *data);
|
||
|
// if(ret == false) {
|
||
|
// logger() << "decode error";
|
||
|
// break;
|
||
|
// }
|
||
|
// if(m_connectType == Type::Server) {
|
||
|
// std::unique_ptr<AbstractData> resp(new TinypbData);
|
||
|
// m_server.getDispatcher().dispatcher(*this, *data, *resp);
|
||
|
// m_server.getCoder().encoder(m_writeBuffer, *resp);
|
||
|
// } else {
|
||
|
// std::shared_ptr<TinypbData> tmp = std::dynamic_pointer_cast<TinypbData>(data);
|
||
|
// m_respond_datas[tmp->msg_req] = data;
|
||
|
// }
|
||
|
|
||
|
|
||
|
// }
|
||
|
|
||
|
// }
|
||
|
|
||
|
// bool TcpConnection::getResPackageData(const std::string& msg_req, std::shared_ptr<AbstractData>& pb_struct) {
|
||
|
// auto it = m_respond_datas.find(msg_req);
|
||
|
// if(it == m_respond_datas.end()) return false;
|
||
|
// pb_struct = it->second;
|
||
|
// m_respond_datas.erase(it);
|
||
|
// return true;
|
||
|
// }
|
||
|
|
||
|
// TcpConnection::~TcpConnection() {
|
||
|
// if(m_state == State::Connected) {
|
||
|
// clearClient();
|
||
|
// }
|
||
|
|
||
|
// logger() << "TcpConnection fd " << m_fdEvent->getFd() << " destructor";
|
||
|
// }
|
||
|
|
||
|
// }
|