Compare commits

...

3 Commits

Author SHA1 Message Date
yhy
43c0ef3507 初步完成rpc功能 2025-02-05 20:50:31 +08:00
yhy
29c4f49f9e 忽略 2025-02-04 16:13:25 +08:00
yhy
77e37f3afc tcpConnect 拆分, tcpClient 初步 2025-02-04 16:09:27 +08:00
43 changed files with 4053 additions and 2437 deletions

4
.gitignore vendored
View File

@ -1,4 +1,8 @@
tinypb.pb.cc
tinypb.pb.h
bin/
build/
lib/
protobuf/
third_party/

View File

@ -40,17 +40,9 @@ add_library(tinyrpc
${ASM_FILES}
)
aux_source_directory(${CMAKE_SOURCE_DIR}/test/returntest TEST_SRC_LIST)
add_executable(test_tinyrpc
${TEST_SRC_LIST}
)
# abseil-cpp
add_subdirectory(./third_party/abseil-cpp absl)
set(ABSEL_LIBARARY
absl::absl_check
absl::absl_log
@ -87,13 +79,31 @@ set(ABSEL_LIBARARY
absl::utility
absl::variant
)
#
target_link_libraries(tinyrpc PRIVATE protobuf) # Protobuf
target_link_libraries(tinyrpc PRIVATE ${ABSEL_LIBARARY}) # Protobuf
aux_source_directory(${CMAKE_SOURCE_DIR}/test/servertest SER_TEST_SRC_LIST)
aux_source_directory(${CMAKE_SOURCE_DIR}/test/clienttest CLI_TEST_SRC_LIST)
add_executable(test_tinyrpc_server
${SER_TEST_SRC_LIST}
)
add_executable(test_tinyrpc_client
${CLI_TEST_SRC_LIST}
)
target_link_libraries(test_tinyrpc_server PRIVATE tinyrpc)
target_link_libraries(test_tinyrpc_client PRIVATE tinyrpc)
aux_source_directory(${CMAKE_SOURCE_DIR}/test/codertest TEST_SRC_LIST)
add_executable(test_tinyrpc
${TEST_SRC_LIST}
)
target_link_libraries(test_tinyrpc PRIVATE tinyrpc)

View File

@ -8,17 +8,16 @@ namespace tinyrpc {
friend void coFunction(Coroutine* co);
private:
Coroutine();
void operator()() const { // 调用 这个协程的回调
m_callback();
}
public:
// Coroutine(std::size_t stack_size, char* stack_sp);
Coroutine(std::function<void()> cb, std::size_t stack_size = 1 * 1024 * 1024/* , char* stack_sp */);
Coroutine(const Coroutine&) = delete;
// int getCorID() const {return m_cor_id;}
void operator()() const { // 调用 这个协程的回调
m_callback();
}
bool isMainCoroutine() const {return m_stack_sp == nullptr;}
// coctx* getContext() {return &m_ctx;}

View File

@ -18,7 +18,7 @@ namespace tinyrpc {
AbstractCoder() = default;
virtual ~AbstractCoder() = default;
virtual bool encoder(TcpBuffer& buffer, AbstractData& data) = 0; // 编码
virtual bool encoder(TcpBuffer& buffer, const AbstractData& data) = 0; // 编码
virtual bool decoder(TcpBuffer& buffer, AbstractData& data) = 0; // 解码

View File

@ -1,7 +1,8 @@
#pragma once
#include "abstract_coder.hpp"
#include "tcp_connection.hpp"
#include "server_tcp_connect.hpp"
namespace tinyrpc {
@ -10,7 +11,7 @@ namespace tinyrpc {
public:
AbstractDispatcher() = default;
virtual ~AbstractDispatcher() = default;
virtual void dispatcher(TcpConnection& conn, AbstractData& data, AbstractData& respond) = 0;
virtual void dispatcher(ServerTcpConnection& conn, AbstractData& data, AbstractData& respond) = 0;
private:

View File

@ -3,7 +3,8 @@
namespace tinyrpc {
enum ErrorCode {
ERROR_PEER_CLOSED, // connect when peer close
ERROR_PEER_CLOSED = 1, // connect when peer close
ERROR_FAILED_CONNECT, // failed to connection peer host
ERROR_FAILED_GET_REPLY, // failed to get server reply
ERROR_FAILED_DESERIALIZE, // deserialize failed
@ -19,7 +20,7 @@ enum ErrorCode {
ERROR_METHOD_NOT_FOUND, // not found method
ERROR_PARSE_SERVICE_NAME, // not found service name
ERROR_ASYNC_RPC_CALL_SINGLE_IOTHREAD, // not supoort async rpc call when only have single iothread
ERROR_ASYNC_RPC_CALL_SINGLE_IOTHREAD = 12, // not supoort async rpc call when only have single iothread
};

View File

@ -0,0 +1,7 @@
#pragma once
namespace tinyrpc {
enum class ProtocolType{
Tinypb,
};
}

View File

@ -0,0 +1,47 @@
#pragma once
#include "coroutine.hpp"
#include "fd_event.hpp"
#include "reactor.hpp"
#include "tcp_buffer.hpp"
namespace tinyrpc {
class TcpServer;
class AbstractTcpConnection {
public:
enum class State{
Disconnected,
Connected
};
// enum class Type{
// Server,
// Client
// };
public:
AbstractTcpConnection(int fd, Reactor& reactor, State state = State::Connected);
void clearClient();
void mainLoopFun();
State getState() {return m_state;}
// bool getResPackageData(const std::string& msg_req, std::shared_ptr<AbstractData>& pb_struct); //cli
virtual ~AbstractTcpConnection();
void addMainTaskToReactor();
public:
void input();
void output();
virtual void process() = 0;
protected:
FdEvent *m_fdEvent;
Coroutine m_mainCoroutine;
State m_state{State::Connected};
TcpBuffer m_writeBuffer{};
TcpBuffer m_readBuffer{};
Reactor& m_reactor;
// TcpServer& m_server;
// TcpClient& m_server;
// std::unordered_map<std::string, std::shared_ptr<AbstractData>> m_respond_datas; // cli
};
}

View File

@ -0,0 +1,26 @@
#pragma once
#include "abstract_coder.hpp"
#include "abstract_tcp_connect.hpp"
#include "tcp_buffer.hpp"
#include <memory>
namespace tinyrpc {
class TcpClient;
class ClientTcpConnection : public AbstractTcpConnection {
public:
ClientTcpConnection(int fd, Reactor& reactor, TcpClient& cli);
~ClientTcpConnection();
TcpBuffer& getSendBuffer() {return m_writeBuffer;}
bool getResPackageData(const std::string& msg_req, std::shared_ptr<AbstractData>& pb_struct);
void process() override;
private:
TcpClient& m_client;
std::unordered_map<std::string, std::shared_ptr<AbstractData>> m_respond_datas; // cli
};
}

View File

@ -1,6 +1,7 @@
#pragma once
#include "abstract_tcp_connect.hpp"
#include "reactor.hpp"
#include "tcp_connection.hpp"
#include "server_tcp_connect.hpp"
#include <mutex>
#include <thread>
#include <unordered_map>
@ -21,7 +22,7 @@ namespace tinyrpc {
~IOThread();
void mainFunc();
private:
std::unordered_map<int, std::shared_ptr<TcpConnection>> m_clients;
std::unordered_map<int, std::shared_ptr<ServerTcpConnection>> m_clients;
std::thread m_thread;
Reactor* m_reactor{nullptr};
};

View File

@ -0,0 +1,23 @@
#pragma once
#include "abstract_tcp_connect.hpp"
#include "reactor.hpp"
namespace tinyrpc {
class TcpServer;
class ServerTcpConnection : public AbstractTcpConnection {
public:
ServerTcpConnection(int fd, Reactor& reactor, TcpServer& ser);
~ServerTcpConnection();
private:
void process() override;
private:
TcpServer& m_server;
};
}

View File

@ -1,4 +1,5 @@
#pragma once
#include "logger.hpp"
#include <cstddef>
#include <cstring>
#include <vector>
@ -14,7 +15,13 @@ namespace tinyrpc {
~TcpBuffer() {
}
void reserved(std::size_t spaceSize) { // 预留空间
if(getWriteable() <= spaceSize) {
resize((getReadable() + spaceSize) * 2);
}
}
void dilatation() {
resize(m_buffer.size() * 2);
}

View File

@ -0,0 +1,33 @@
#pragma once
#include "abstract_coder.hpp"
#include "client_tcp_connect.hpp"
#include "coroutine.hpp"
#include "net_address.hpp"
#include "reactor.hpp"
#include "tinypb_data.hpp"
#include <memory>
namespace tinyrpc {
class TcpClient {
public:
TcpClient(const NetAddress& peerAddr);
AbstractCoder& getCoder() {return *m_coder;}
const NetAddress& getLocalAddr() const {return m_local_addr;}
const NetAddress& getPeerAddr() const {return m_peer_addr;}
bool writeToSendBuffer(const AbstractData& data);
int sendAndRecvData(const std::string& msg_req, std::shared_ptr<AbstractData>& res);
void addCoroutine(Coroutine& cor);
bool connectToServer();
void start();
~TcpClient();
private:
int m_fd{-1};
NetAddress m_local_addr{};
NetAddress m_peer_addr{};
Reactor& m_reactor;
std::unique_ptr<ClientTcpConnection> m_connection;
AbstractCoder* m_coder{};
};
}

View File

@ -1,42 +0,0 @@
#pragma once
#include "abstract_coder.hpp"
#include "coroutine.hpp"
#include "fd_event.hpp"
#include "reactor.hpp"
#include "tcp_buffer.hpp"
namespace tinyrpc {
class TcpServer;
class TcpConnection {
public:
enum class State{
Disconnected,
Connected
};
public:
TcpConnection(int fd, Reactor& reactor, TcpServer& ser);
void clearClient();
void mainLoopFun();
~TcpConnection();
private:
void input();
void output();
void process();
private:
FdEvent *m_fdEvent;
Coroutine m_mainCoroutine;
State m_state{State::Connected};
TcpBuffer m_writeBuffer{};
TcpBuffer m_readBuffer{};
Reactor& m_reactor;
TcpServer& m_server;
};
}

View File

@ -4,7 +4,9 @@
#include "coroutine.hpp"
#include "io_thread.hpp"
#include "net_address.hpp"
#include "protocol_type.hpp"
#include <cstdint>
#include <google/protobuf/service.h>
// #include "reactor.hpp"
namespace tinyrpc {
@ -28,6 +30,7 @@ namespace tinyrpc {
void start();
AbstractCoder& getCoder() {return *m_coder;}
AbstractDispatcher& getDispatcher() {return *m_dispatcher;}
void registerService(std::shared_ptr<google::protobuf::Service> service);
private:
void mainAcceptCorFun();
private:
@ -41,6 +44,7 @@ namespace tinyrpc {
IOThreadPool m_ioThreadPool{4};
AbstractCoder* m_coder{};
AbstractDispatcher* m_dispatcher{};
ProtocolType m_protocolType{ProtocolType::Tinypb};
};
}

View File

@ -0,0 +1,23 @@
#pragma once
#include "net_address.hpp"
#include "tcp_client.hpp"
#include <google/protobuf/service.h>
namespace tinyrpc {
class TinypbChannel : public google::protobuf::RpcChannel {
public:
TinypbChannel(const NetAddress& peerAddr);
~TinypbChannel() = default;
void CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done) override;
private:
TcpClient m_client;
};
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <google/protobuf/stubs/callback.h>
#include <functional>
namespace tinyrpc {
class TinypbClosure : public google::protobuf::Closure {
public:
explicit TinypbClosure(const std::function<void()>& cb) : m_callback(cb){}
~TinypbClosure() = default;
void Run() override {
if(m_callback) {
m_callback();
}
}
private:
std::function<void()> m_callback{};
};
}

View File

@ -7,7 +7,7 @@ namespace tinyrpc {
public:
TinypbCoder();
~TinypbCoder();
bool encoder(TcpBuffer& buffer, AbstractData& data) override; // 编码
bool encoder(TcpBuffer& buffer, const AbstractData& data) override; // 编码
bool decoder(TcpBuffer& buffer, AbstractData& data) override; // 解码
private:

View File

@ -0,0 +1,90 @@
#pragma once
#include "net_address.hpp"
#include <google/protobuf/service.h>
namespace tinyrpc {
class TinypbController : public google::protobuf::RpcController {
public:
TinypbController() = default;
~TinypbController() = default;
void Reset() override{}
bool Failed() const override{return m_is_failed;}
// Server-side methods ---------------------------------------------
std::string ErrorText() const override {return m_error_info;}
void StartCancel() override{}
void SetFailed(const std::string& reason) override {
m_is_failed = true;
m_error_info = reason;
}
bool IsCanceled() const override {
return m_is_canceled;
}
void NotifyOnCancel(google::protobuf::Closure* callback) override {}
// common methods
int ErrorCode() const {return m_error_code;}
void SetErrorCode(const int error_code) {m_error_code = error_code;}
void SetError(const int err_code, const std::string& err_info) {
SetFailed(err_info);
SetErrorCode(err_code);
}
void SetPeerAddr(const NetAddress& addr) {
m_peer_addr = addr;
}
void SetLocalAddr(const NetAddress& addr) {
m_local_addr = addr;
}
const NetAddress& PeerAddr() {return m_peer_addr;}
const NetAddress& LocalAddr(){return m_local_addr;}
void SetTimeout(const int timeout) {m_timeout = timeout;}
int Timeout() const {return m_timeout;}
void SetMsgReq(const std::string& msg_req ) {m_msg_req = msg_req;}
void SetMethodName(const std::string& method_name ) {m_method_name = method_name;}
void SetFullName(const std::string& full_name ) {m_full_name = full_name;}
const std::string GetMsgReq() const {return m_msg_req;}
const std::string GetMethodName() const {return m_method_name;}
const std::string GetFullName() const {return m_full_name;}
private:
int m_error_code {0};
bool m_is_failed {false};
bool m_is_canceled {false};
std::string m_error_info{};
std::string m_msg_req{};
std::string m_method_name{};
std::string m_full_name{};
int m_timeout {5000};
NetAddress m_peer_addr{};
NetAddress m_local_addr{};
};
}

View File

@ -1,31 +1,73 @@
#pragma once
#include "abstract_coder.hpp"
#include <cstdint>
#include <string>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
namespace tinyrpc {
static thread_local std::string t_msg_req_nu;
static thread_local std::string t_max_msg_req_nu;
static int g_random_fd = -1;
struct TinypbData : public AbstractData {
static std::string genMsgNumber()
{
int t_msg_req_len = 8;
struct TinypbData : public AbstractData {
if (t_msg_req_nu.empty() || t_msg_req_nu == t_max_msg_req_nu) {
if (g_random_fd == -1) {
g_random_fd = open("/dev/urandom", O_RDONLY);
}
std::string res(t_msg_req_len, 0);
TinypbData() {};
~TinypbData() {};
if ((read(g_random_fd, &res[0], t_msg_req_len)) != t_msg_req_len) {
return "";
}
t_max_msg_req_nu = "";
// char start = 0x02; // indentify start of a TinyPb protocal data
int32_t pk_len {0}; // len of all package(include start char and end char)
int32_t msg_req_len {0}; // len of msg_req
std::string msg_req; // msg_req, which identify a request
int32_t service_name_len {0}; // len of service full name
std::string service_full_name; // service full name, like QueryService.query_name
int32_t err_code {0}; // err_code, 0 -- call rpc success, otherwise -- call rpc failed. it only be seted by RpcController
int32_t err_info_len {0}; // len of err_info
std::string err_info; // err_info, empty -- call rpc success, otherwise -- call rpc failed, it will display details of reason why call rpc failed. it only be seted by RpcController
std::string pb_data; // business pb data
int32_t check_num {-1}; // check_num of all package. to check legality of data
// char end = 0x03; // identify end of a TinyPb protocal data
};
for (int i = 0; i < t_msg_req_len; ++i) {
uint8_t x = ((uint8_t)(res[i])) % 10;
res[i] = x + '0';
t_max_msg_req_nu += "9";
}
t_msg_req_nu = res;
} else {
int i = t_msg_req_nu.length() - 1;
while (t_msg_req_nu[i] == '9' && i >= 0) {
i--;
}
if (i >= 0) {
t_msg_req_nu[i] += 1;
for (size_t j = i + 1; j < t_msg_req_nu.length(); ++j) {
t_msg_req_nu[j] = '0';
}
}
}
return t_msg_req_nu;
}
TinypbData() {};
~TinypbData() {};
// char start = 0x02; // indentify start of a TinyPb protocal data
int32_t pk_len { 0 }; // len of all package(include start char and end char)
int32_t msg_req_len { 0 }; // len of msg_req
std::string msg_req; // msg_req, which identify a request
int32_t service_name_len { 0 }; // len of service full name
std::string service_full_name; // service full name, like QueryService.query_name
int32_t err_code { 0 }; // err_code, 0 -- call rpc success, otherwise -- call rpc failed. it only be seted by RpcController
int32_t err_info_len { 0 }; // len of err_info
std::string err_info; // err_info, empty -- call rpc success, otherwise -- call rpc failed, it will display details of reason why call rpc failed. it only be seted by RpcController
std::string pb_data; // business pb data
int32_t check_num { -1 }; // check_num of all package. to check legality of data
// char end = 0x03; // identify end of a TinyPb protocal data
};
}

View File

@ -1,6 +1,7 @@
#pragma once
#include "abstract_dispatcher.hpp"
#include <memory>
#include <unordered_map>
#include <google/protobuf/message.h>
#include <google/protobuf/service.h>
@ -15,10 +16,11 @@ namespace tinyrpc {
public:
TinypbDispatcher();
~TinypbDispatcher();
void dispatcher(TcpConnection& conn, AbstractData& data, AbstractData& respond) override;
void dispatcher(ServerTcpConnection& conn, AbstractData& data, AbstractData& respond) override;
bool parseServiceFullName(const std::string& name, std::string& serviceName, std::string& methodName);
void registerService(std::shared_ptr<Service>& service);
private:
std::unordered_map<std::string, Service*> m_service_map;
std::unordered_map<std::string, std::shared_ptr<Service>> m_service_map;
};

View File

@ -1,35 +1,31 @@
#include "tcp_connection.hpp"
#include "abstract_coder.hpp"
#include "abstract_tcp_connect.hpp"
#include "coroutine_hook.hpp"
#include "fd_event.hpp"
#include "logger.hpp"
#include "reactor.hpp"
#include "tcp_server.hpp"
#include "tinypb_data.hpp"
#include <cerrno>
#include <cstring>
#include <memory>
#include <pthread.h>
#include <unistd.h>
namespace tinyrpc {
TcpConnection::TcpConnection(int fd, Reactor& reactor, TcpServer& ser) :
AbstractTcpConnection::AbstractTcpConnection(int fd, Reactor& reactor, State state) :
m_fdEvent(FdEventPool::getInstance()->getFdEvent(fd)),
m_mainCoroutine(std::bind(&TcpConnection::mainLoopFun, this)),
m_reactor(reactor),
m_server(ser)
m_mainCoroutine(std::bind(&AbstractTcpConnection::mainLoopFun, this)),
m_state(state),
m_reactor(reactor)
{
}
void AbstractTcpConnection::addMainTaskToReactor() {
Reactor::Task task = [this] {
logger() << "conn coroutine is resume";
m_mainCoroutine.resume();
};
reactor.addTask(task, true);
m_reactor.addTask(task, true);
}
void TcpConnection::mainLoopFun() {
void AbstractTcpConnection::mainLoopFun() {
while(m_state == State::Connected) {
input();
process();
@ -38,7 +34,7 @@ namespace tinyrpc {
logger() << "this conn loop has already break";
}
void TcpConnection::clearClient() {
void AbstractTcpConnection::clearClient() {
logger() << "clearClient";
m_state = State::Disconnected;
m_reactor.delFdEvent(m_fdEvent);
@ -47,7 +43,7 @@ namespace tinyrpc {
}
void TcpConnection::input() { // 调用 read_hook 读数据到应用层缓冲区
void AbstractTcpConnection::input() { // 调用 read_hook 读数据到应用层缓冲区
logger() << "input";
if(m_state == State::Disconnected) {
logger() << "input: this conn has already break";
@ -84,7 +80,7 @@ namespace tinyrpc {
}
void TcpConnection::output() {
void AbstractTcpConnection::output() {
logger() << "output";
if(m_state == State::Disconnected) {
return;
@ -113,42 +109,50 @@ namespace tinyrpc {
}
void TcpConnection::process() {
logger() << "process";
// if(m_state == State::Disconnected) {
// return;
// }
// if(m_writeBuffer.getWriteable() < m_readBuffer.getReadable()) {
// m_writeBuffer.resize(m_readBuffer.getReadable() * 2);
// }
// std::memcpy(m_writeBuffer.getWriteAddress(), m_readBuffer.getReadAddress(), m_readBuffer.getReadable());
// m_writeBuffer.writeOffset(m_readBuffer.getReadable());
// m_readBuffer.readOffset(m_readBuffer.getReadable());
// void AbstractTcpConnection::process() {
// logger() << "process";
// logger() << "write data " << m_writeBuffer.getReadable() << " byte";
// // if(m_state == State::Disconnected) {
// // return;
// // }
// // if(m_writeBuffer.getWriteable() < m_readBuffer.getReadable()) {
// // m_writeBuffer.resize(m_readBuffer.getReadable() * 2);
// // }
// // std::memcpy(m_writeBuffer.getWriteAddress(), m_readBuffer.getReadAddress(), m_readBuffer.getReadable());
// // m_writeBuffer.writeOffset(m_readBuffer.getReadable());
// // m_readBuffer.readOffset(m_readBuffer.getReadable());
while(m_readBuffer.getReadable() > 0) {
std::unique_ptr<AbstractData> data(new TinypbData);
// // logger() << "write data " << m_writeBuffer.getReadable() << " byte";
bool ret = m_server.getCoder().decoder(m_readBuffer, *data);
if(ret == false) {
logger() << "decode error";
break;
}
std::unique_ptr<AbstractData> resp(new TinypbData);
m_server.getDispatcher().dispatcher(*this, *data, *resp);
// while(m_readBuffer.getReadable() > 0) {
// std::shared_ptr<AbstractData> data(new TinypbData);
m_server.getCoder().encoder(m_writeBuffer, *resp);
// bool ret = m_server.getCoder().decoder(m_readBuffer, *data);
// if(ret == false) {
// logger() << "decode error";
// break;
// }
// if(m_connectType == Type::Server) {
// std::unique_ptr<AbstractData> resp(new TinypbData);
// m_server.getDispatcher().dispatcher(*this, *data, *resp);
// m_server.getCoder().encoder(m_writeBuffer, *resp);
// } else {
// std::shared_ptr<TinypbData> tmp = std::dynamic_pointer_cast<TinypbData>(data);
// m_respond_datas[tmp->msg_req] = data;
// }
}
}
// }
TcpConnection::~TcpConnection() {
// }
AbstractTcpConnection::~AbstractTcpConnection() {
if(m_state == State::Connected) {
clearClient();
}
logger() << "TcpConnection fd " << m_fdEvent->getFd() << " destructor";
logger() << "AbstractTcpConnection fd " << m_fdEvent->getFd() << " destructor";
}
}

View File

@ -0,0 +1,46 @@
#include "client_tcp_connect.hpp"
#include "abstract_tcp_connect.hpp"
#include "logger.hpp"
#include "tcp_client.hpp"
#include "tinypb_data.hpp"
#include <memory>
namespace tinyrpc {
ClientTcpConnection::ClientTcpConnection(int fd, Reactor& reactor, TcpClient& cli) :
AbstractTcpConnection(fd, reactor),
m_client(cli)
{
}
ClientTcpConnection::~ClientTcpConnection() {
}
void ClientTcpConnection::process() {
while(m_readBuffer.getReadable() > 0) {
std::shared_ptr<AbstractData> data(new TinypbData);
bool ret = m_client.getCoder().decoder(m_readBuffer, *data);
if(ret == false) {
logger() << "decode error";
break;
}
std::shared_ptr<TinypbData> tmp = std::dynamic_pointer_cast<TinypbData>(data);
m_respond_datas[tmp->msg_req] = data;
}
}
bool ClientTcpConnection::getResPackageData(const std::string& msg_req, std::shared_ptr<AbstractData>& pb_struct) {
auto it = m_respond_datas.find(msg_req);
if(it == m_respond_datas.end()) return false;
pb_struct = it->second;
m_respond_datas.erase(it);
return true;
}
}

View File

@ -2,7 +2,7 @@
#include "logger.hpp"
#include "reactor.hpp"
#include "coroutine.hpp"
#include "tcp_connection.hpp"
#include "server_tcp_connect.hpp"
#include "tcp_server.hpp"
#include <memory>
#include <mutex>
@ -25,9 +25,8 @@ namespace tinyrpc {
void IOThread::addClient(TcpServer* ser, int fd) {
m_clients[fd] = std::make_shared<TcpConnection>(fd, *m_reactor, *ser);
m_clients[fd] = std::shared_ptr<ServerTcpConnection>(new ServerTcpConnection(fd, *m_reactor, *ser));
m_clients[fd]->addMainTaskToReactor();
}
void IOThread::mainFunc() {

View File

@ -0,0 +1,35 @@
#include "server_tcp_connect.hpp"
#include "abstract_tcp_connect.hpp"
#include "logger.hpp"
#include "tcp_server.hpp"
#include "tinypb_data.hpp"
namespace tinyrpc {
ServerTcpConnection::ServerTcpConnection(int fd, Reactor& reactor, TcpServer& ser) :
AbstractTcpConnection(fd, reactor),
m_server(ser)
{
}
ServerTcpConnection::~ServerTcpConnection() {
}
void ServerTcpConnection::process() {
while(m_readBuffer.getReadable() > 0) {
std::shared_ptr<AbstractData> data(new TinypbData);
bool ret = m_server.getCoder().decoder(m_readBuffer, *data);
if(ret == false) {
logger() << "decode uncompleted";
break;
}
std::unique_ptr<AbstractData> resp(new TinypbData);
m_server.getDispatcher().dispatcher(*this, *data, *resp);
m_server.getCoder().encoder(m_writeBuffer, *resp);
}
}
}

88
src/net/tcp/tcp_client.cc Normal file
View File

@ -0,0 +1,88 @@
#include "tcp_client.hpp"
#include "abstract_tcp_connect.hpp"
#include "client_tcp_connect.hpp"
#include "coroutine.hpp"
#include "error_code.hpp"
#include "fd_event.hpp"
#include "logger.hpp"
#include "net_address.hpp"
#include "reactor.hpp"
#include "tinypb_coder.hpp"
#include <unistd.h>
namespace tinyrpc {
TcpClient::TcpClient(const NetAddress& peerAddr) :
m_local_addr("127.0.0.1", 0),
m_peer_addr(peerAddr),
m_reactor(*Reactor::getReactor())
{
m_fd = socket(AF_INET, SOCK_STREAM, 0);
if (m_fd == -1) {
logger() << "call socket error, fd=-1, sys error=" << strerror(errno);
}
m_coder = new TinypbCoder();
// m_connection = new ClientTcpConnection(m_fd, m_reactor, *this);
}
TcpClient::~TcpClient() {
m_reactor.delFdEvent(FdEventPool::getInstance()->getFdEvent(m_fd));
if(m_fd != -1) close(m_fd);
delete m_coder;
}
bool TcpClient::connectToServer() {
if(m_connection.get() == nullptr || m_connection->getState() == ClientTcpConnection::State::Disconnected) {
int ret = connect(m_fd, m_peer_addr.getSockaddr(), m_peer_addr.getSockLen());
if(ret == -1) return false;
m_connection.reset(new ClientTcpConnection(m_fd, m_reactor, *this));
}
return true;
}
bool TcpClient::writeToSendBuffer(const AbstractData& data) {
connectToServer();
return m_coder->encoder(m_connection->getSendBuffer(), data);
}
int TcpClient::sendAndRecvData(const std::string& msg_req, std::shared_ptr<AbstractData>& res) {
if(!connectToServer()) {
logger() << "error1";
return ERROR_FAILED_CONNECT;
}
m_connection->output();
if(m_connection->getState() == ClientTcpConnection::State::Disconnected) {
logger() << "error1";
return ERROR_FAILED_GET_REPLY;
}
do {
m_connection->input();
m_connection->process();
if(m_connection->getState() != ClientTcpConnection::State::Connected) {
logger() << "error1";
return ERROR_FAILED_GET_REPLY;
}
}while(!m_connection->getResPackageData(msg_req, res));
return 0;
}
void TcpClient::addCoroutine(Coroutine& cor) {
Reactor::Task task = [&cor] {
cor.resume();
};
m_reactor.addTask(task);
}
void TcpClient::start() {
Coroutine::getMainCoroutine();
m_reactor.loop();
}
}

View File

@ -4,6 +4,7 @@
#include "logger.hpp"
#include "coroutine_hook.hpp"
#include "net_address.hpp"
#include "protocol_type.hpp"
#include "reactor.hpp"
#include "tinypb_coder.hpp"
#include "tinypb_dispatcher.hpp"
@ -103,4 +104,12 @@ namespace tinyrpc {
}
}
void TcpServer::registerService(std::shared_ptr<google::protobuf::Service> service) {
if(m_protocolType != ProtocolType::Tinypb) return;
if(service) {
dynamic_cast<TinypbDispatcher*>(m_dispatcher)->registerService(service);
}
}
}

View File

@ -0,0 +1,77 @@
#include "tinypb_channel.hpp"
#include "error_code.hpp"
#include "logger.hpp"
#include "tinypb_controller.hpp"
#include "tinypb_data.hpp"
#include <google/protobuf/message.h>
#include <memory>
namespace tinyrpc {
TinypbChannel::TinypbChannel(const NetAddress& peerAddr) :
m_client(peerAddr)
{
}
void TinypbChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const google::protobuf::Message* request,
google::protobuf::Message* response,
google::protobuf::Closure* done)
{
TinypbController* rpc_controller = dynamic_cast<TinypbController*>(controller);
rpc_controller->SetLocalAddr(m_client.getLocalAddr());
rpc_controller->SetPeerAddr(m_client.getPeerAddr());
std::unique_ptr<TinypbData> data(new TinypbData);
data->service_full_name = method->full_name();
if (!request->SerializeToString(&(data->pb_data))) {
logger() << "serialize send package error";
return;
}
data->msg_req = TinypbData::genMsgNumber();
data->msg_req_len = data->msg_req.length();
rpc_controller->SetMsgReq(data->msg_req);
m_client.writeToSendBuffer(*data);
std::shared_ptr<AbstractData> res;
int ret = m_client.sendAndRecvData(data->msg_req, res);
if(ret != 0) {
rpc_controller->SetError(ret, "sendAndRecvData err");
return;
}
std::shared_ptr<TinypbData> res_data = std::dynamic_pointer_cast<TinypbData>(res);
// CONTINUE
if(!response->ParseFromString(res_data->pb_data)) {
rpc_controller->SetError(ERROR_FAILED_DESERIALIZE, "failed to deserialize data from server");
return;
}
if(res_data->err_code != 0) {
rpc_controller->SetError(res_data->err_code, res_data->err_info);
return;
}
logger() << "============================================================";
logger() << data->msg_req << "|" << rpc_controller->PeerAddr().toString()
<< "|call rpc server [" << data->service_full_name << "] succ"
<< ". Get server reply response data:" << response->ShortDebugString();
logger() << "============================================================";
if(done) done->Run();
}
}

View File

@ -1,12 +1,11 @@
#include "tinypb_coder.hpp"
#include "abstract_coder.hpp"
#include "logger.hpp"
#include "tinypb_data.hpp"
#include <cstdint>
#include <cstring>
#include <memory>
#include <netinet/in.h>
#include <new>
#include <vector>
namespace tinyrpc {
@ -16,13 +15,16 @@ namespace tinyrpc {
// static const int MSG_REQ_LEN = 20; // default length of msg_req
TinypbCoder::TinypbCoder() {
// TODO
}
TinypbCoder::~TinypbCoder() {
// TODO
}
bool TinypbCoder::encoder(TcpBuffer& buffer, AbstractData& data) {
TinypbData& pbdata = dynamic_cast<TinypbData&>(data);
bool TinypbCoder::encoder(TcpBuffer& buffer, const AbstractData& data) {
logger() << "encoder";
const TinypbData& pbdata = dynamic_cast<const TinypbData&>(data);
//TinypbData->encode_succ = false; TODO?
if(pbdata.msg_req.empty()) return false;
if(pbdata.service_full_name.empty()) return false;
if(pbdata.pb_data.empty()) return false;
@ -52,7 +54,7 @@ namespace tinyrpc {
int32_t service_name_len_net = htonl(service_name_len);
memcpy(&buf[cur_index], &service_name_len_net, sizeof(service_name_len_net));
cur_index += sizeof(service_name_len_net);
memcpy(&buf[cur_index], pbdata.msg_req.c_str(), service_name_len);
memcpy(&buf[cur_index], pbdata.service_full_name.c_str(), service_name_len);
cur_index += service_name_len;
int32_t err_code = pbdata.err_code;
@ -67,25 +69,28 @@ namespace tinyrpc {
memcpy(&buf[cur_index], pbdata.err_info.c_str(), err_info_len);
cur_index += err_info_len;
int32_t pb_data_len = pbdata.err_info.length();
int32_t pb_data_len = pbdata.pb_data.length();
memcpy(&buf[cur_index], pbdata.pb_data.c_str(), pb_data_len);
cur_index += pb_data_len;
int32_t check_num = 1;
int32_t check_num = 1; // checksum has not been implemented yet, directly skip chcksum
int32_t check_num_net = htonl(check_num);
memcpy(&buf[cur_index], &check_num_net, sizeof(check_num_net));
cur_index += sizeof(check_num_net);
buf[cur_index++] = PB_END;
memcpy(buffer.getWriteAddress(), buf.get(), pk_len);
buffer.reserved(pk_len);
memcpy(buffer.getWriteAddress(), buf.get(), pk_len); // 预留
buffer.writeOffset(pk_len);
return true;
}
bool TinypbCoder::decoder(TcpBuffer& buffer, AbstractData& data) {
logger() << "decoder";
char* buff = static_cast<char*>(buffer.getReadAddress());
int start_index = -1;
@ -94,30 +99,33 @@ namespace tinyrpc {
int pack_len = -1;
for(int i = 0; i < static_cast<int>(buffer.getReadable()); i++) {
if(buff[i] == PB_START) {
if(i + 1 >= static_cast<int>(buffer.getReadable())) {
return false; // 包不完整
}
pack_len = getInt32FromNetByte(buff[i + 1]);
end_index = pack_len + i - 1;
if(end_index >= static_cast<int>(buffer.getReadable())) {
continue;
}
if(buff[end_index] == PB_END) {
isFullPack = true;
start_index = i;
break;
}
if(buff[i] != PB_START) continue;
if(i + 1 >= static_cast<int>(buffer.getReadable())) {
return false; // 包不完整
}
pack_len = getInt32FromNetByte(buff[i + 1]);
end_index = pack_len + i - 1;
if(end_index >= static_cast<int>(buffer.getReadable())) {
continue; // 不符合格式
}
if(buff[end_index] != PB_END) {
continue; // 不符合格式
}
isFullPack = true;
start_index = i;
break;
}
if(isFullPack == false) {
return false; // 包不完整
}
@ -207,7 +215,7 @@ namespace tinyrpc {
pbdata.pb_data = std::string(&buff[cur_index], pb_data_len);
cur_index += pb_data_len;
buffer.readOffset(cur_index - start_index);
buffer.readOffset(end_index + 1);
return true;

View File

@ -1,8 +1,9 @@
#include "tinypb_dispatcher.hpp"
#include "logger.hpp"
#include "tinypb_closure.hpp"
#include "tinypb_controller.hpp"
#include "tinypb_data.hpp"
#include "error_code.hpp"
#include <cstddef>
#include <memory>
#include <sstream>
@ -12,16 +13,22 @@ namespace tinyrpc {
TinypbDispatcher::~TinypbDispatcher() {
}
void TinypbDispatcher::dispatcher(TcpConnection& conn, AbstractData& data, AbstractData& resp) {
void TinypbDispatcher::dispatcher(ServerTcpConnection& conn, AbstractData& data, AbstractData& resp) {
logger() << "dispatcher";
TinypbData& pbdata = dynamic_cast<TinypbData&>(data);
TinypbData& request = dynamic_cast<TinypbData&>(data);
TinypbData& respond = dynamic_cast<TinypbData&>(resp);
std::string service_name;
std::string method_name;
respond.service_full_name = pbdata.service_full_name;
respond.msg_req = pbdata.msg_req;
bool ret = parseServiceFullName(pbdata.service_full_name, service_name, method_name);
respond.service_full_name = request.service_full_name;
respond.msg_req = request.msg_req;
logger() << "request service_full_name:" << "[" << request.service_full_name << "]";
logger() << "request msg_req:" << "[" << request.msg_req << "]";
bool ret = parseServiceFullName(request.service_full_name, service_name, method_name);
if(ret == false) {
respond.err_code = ERROR_PARSE_SERVICE_NAME;
std::stringstream ss;
@ -29,6 +36,7 @@ namespace tinyrpc {
respond.err_info = ss.str();
return;
}
logger() << "request method_name:" << "[" << method_name << "]";
auto it = m_service_map.find(service_name);
@ -41,7 +49,7 @@ namespace tinyrpc {
}
Service* service = it->second;
std::shared_ptr<Service> service = it->second;
const Method* method = service->GetDescriptor()->FindMethodByName(method_name);
// const Method* method = nullptr;
@ -54,7 +62,7 @@ namespace tinyrpc {
}
std::unique_ptr<Message> requestMsg (service->GetRequestPrototype(method).New());
ret = requestMsg->ParseFromString(pbdata.pb_data);
ret = requestMsg->ParseFromString(request.pb_data);
if(ret == false) {
respond.err_code = ERROR_FAILED_SERIALIZE;
@ -64,7 +72,7 @@ namespace tinyrpc {
return;
}
std::unique_ptr<Message> respondMsg (service->GetRequestPrototype(method).New());
std::unique_ptr<Message> respondMsg (service->GetResponsePrototype(method).New());
auto callback = [&respond, &respondMsg] {
if(!respondMsg->SerializePartialToString(&respond.pb_data)) {
@ -77,7 +85,13 @@ namespace tinyrpc {
}
};
service->CallMethod(method, nullptr, requestMsg.get(), respondMsg.get(), nullptr /* callback */);
std::unique_ptr<TinypbController> rpcController(new TinypbController);
rpcController->SetMsgReq(respond.msg_req);
rpcController->SetFullName(respond.service_full_name);
rpcController->SetMethodName(method_name);
TinypbClosure done(callback);
service->CallMethod(method, rpcController.get(), requestMsg.get(), respondMsg.get(), &done /* callback */);
}
@ -89,10 +103,16 @@ namespace tinyrpc {
serviceName = name.substr(0, pos);
methodName = name.substr(pos + 1);
logger() << "serviceName=" << serviceName;
logger() << "methodName=" << methodName;
// logger() << "serviceName=" << serviceName;
// logger() << "methodName=" << methodName;
return true;
}
void TinypbDispatcher::registerService(std::shared_ptr<Service>& service) {
std::string service_name = service->GetDescriptor()->full_name();
m_service_map[service_name] = service;
logger() << "success register service:" << "[" << service_name << "]";
}
}

View File

@ -0,0 +1,66 @@
#include "coroutine.hpp"
#include "logger.hpp"
#include "net_address.hpp"
#include "reactor.hpp"
#include "tcp_client.hpp"
#include "tcp_server.hpp"
#include "tinypb.pb.h"
#include "tinypb_channel.hpp"
#include "tinypb_closure.hpp"
#include "tinypb_controller.hpp"
#include <iostream>
using namespace std;
using namespace tinyrpc;
int n = 10;
void test()
{
NetAddress addr("127.0.0.1", 9001);
TinypbChannel channel(addr);
while (n--) {
logger() << "============== test no:" << n << "===============";
queryNameReq req_name;
req_name.set_req_no(20220315);
req_name.set_id(1100110001);
req_name.set_type(1);
queryNameRes res_name;
queryAgeReq req_age;
req_age.set_req_no(00001111);
req_age.set_id(6781);
queryAgeRes res_age;
TinypbClosure cb([]() {
logger() << "==========================";
logger() << "succ call rpc";
logger() << "==========================";
});
QueryService_Stub stub(&channel);
TinypbController rpc_controller;
stub.query_name(&rpc_controller, &req_name, &res_name, &cb);
if (rpc_controller.ErrorCode() != 0) {
logger() << "call rpc method query_name failed, errcode=" << rpc_controller.ErrorCode() << ",error=" << rpc_controller.ErrorText();
}
if (res_name.ret_code() != 0) {
logger() << "query name error, errcode=" << res_name.ret_code() << ", res_info=" << res_name.res_info();
} else {
logger() << "get res_name.age = " << res_name.name();
}
}
}
int main()
{
// TcpServer server;
TcpClient client(NetAddress("127.0.0.1", 9001));
Coroutine cor(test);
client.addCoroutine(cor);
client.start();
return 0;
}

View File

@ -0,0 +1,38 @@
syntax = "proto3";
option cc_generic_services = true;
message queryAgeReq {
int32 req_no = 1;
int32 id = 2;
}
message queryAgeRes {
int32 ret_code = 1;
string res_info = 2;
int32 req_no = 3;
int32 id = 4;
int32 age = 5;
}
message queryNameReq {
int32 req_no = 1;
int32 id = 2;
int32 type = 3;
}
message queryNameRes {
int32 ret_code = 1;
string res_info = 2;
int32 req_no = 3;
int32 id = 4;
string name = 5;
}
service QueryService {
// rpc method name
rpc query_name(queryNameReq) returns (queryNameRes);
// rpc method name
rpc query_age(queryAgeReq) returns (queryAgeRes);
}

27
test/codertest/main.cc Normal file
View File

@ -0,0 +1,27 @@
#include <iostream>
#include "tinypb_coder.hpp"
#include "tinypb_data.hpp"
using namespace std;
using namespace tinyrpc;
int main() {
TinypbData data;
data.msg_req = "11231231312";
data.pb_data = "121231233456";
data.service_full_name = "aaa.b12313bb";
TinypbCoder coder;
TcpBuffer buffer;
TinypbData newdata;
coder.encoder(buffer, data);
coder.decoder(buffer, newdata);
cout << newdata.msg_req << endl;
cout << newdata.pb_data << endl;
cout << newdata.service_full_name << endl;
return 0;
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

Binary file not shown.

View File

@ -0,0 +1,56 @@
#include "logger.hpp"
#include "tcp_server.hpp"
#include "tinypb.pb.h"
#include <google/protobuf/service.h>
#include <iostream>
using namespace std;
using namespace tinyrpc;
class QueryServiceImpl : public QueryService {
public:
QueryServiceImpl() { }
~QueryServiceImpl() { }
void query_name(google::protobuf::RpcController* controller,
const ::queryNameReq* request,
::queryNameRes* response,
::google::protobuf::Closure* done) override
{
logger() << "QueryServiceImpl.query_name, req={" << request->ShortDebugString() << "}";
logger() << response;
response->set_ret_code(0);
response->set_res_info("OK");
response->set_req_no(request->req_no());
response->set_id(request->id());
response->set_name("yyy");
done->Run();
}
void query_age(google::protobuf::RpcController* controller,
const ::queryAgeReq* request,
::queryAgeRes* response,
::google::protobuf::Closure* done) override
{
logger() << "QueryServiceImpl.query_name, req={" << request->ShortDebugString() << "}";
response->set_ret_code(0);
response->set_res_info("OK");
response->set_req_no(request->req_no());
response->set_id(request->id());
response->set_age(20);
done->Run();
}
private:
};
int main()
{
TcpServer server("0.0.0.0", 9001);
server.registerService(std::make_shared<QueryServiceImpl>());
server.start();
return 0;
}

1439
test/servertest/tinypb.pb.cc Normal file

File diff suppressed because it is too large Load Diff

1383
test/servertest/tinypb.pb.h Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,38 @@
syntax = "proto3";
option cc_generic_services = true;
message queryAgeReq {
int32 req_no = 1;
int32 id = 2;
}
message queryAgeRes {
int32 ret_code = 1;
string res_info = 2;
int32 req_no = 3;
int32 id = 4;
int32 age = 5;
}
message queryNameReq {
int32 req_no = 1;
int32 id = 2;
int32 type = 3;
}
message queryNameRes {
int32 ret_code = 1;
string res_info = 2;
int32 req_no = 3;
int32 id = 4;
string name = 5;
}
service QueryService {
// rpc method name
rpc query_name(queryNameReq) returns (queryNameRes);
// rpc method name
rpc query_age(queryAgeReq) returns (queryAgeRes);
}

173
test/tcp_connection.cc Normal file
View File

@ -0,0 +1,173 @@
// #include "tcp_connection.hpp"
// #include "abstract_coder.hpp"
// #include "coroutine_hook.hpp"
// #include "fd_event.hpp"
// #include "logger.hpp"
// #include "reactor.hpp"
// #include "tcp_client.hpp"
// #include "tcp_server.hpp"
// #include "tinypb_data.hpp"
// #include <cerrno>
// #include <cstring>
// #include <memory>
// #include <pthread.h>
// #include <unistd.h>
// namespace tinyrpc {
// TcpConnection::TcpConnection(int fd, Reactor& reactor, TcpServer& ser, Type type) :
// m_fdEvent(FdEventPool::getInstance()->getFdEvent(fd)),
// m_mainCoroutine(std::bind(&TcpConnection::mainLoopFun, this)),
// m_reactor(reactor),
// m_server(ser),
// m_connectType(type)
// {
// Reactor::Task task = [this] {
// logger() << "conn coroutine is resume";
// m_mainCoroutine.resume();
// };
// reactor.addTask(task, true);
// }
// void TcpConnection::mainLoopFun() {
// while(m_state == State::Connected) {
// input();
// process();
// output();
// }
// logger() << "this conn loop has already break";
// }
// void TcpConnection::clearClient() {
// logger() << "clearClient";
// m_state = State::Disconnected;
// m_reactor.delFdEvent(m_fdEvent);
// m_fdEvent->reset();
// close(m_fdEvent->getFd());
// }
// void TcpConnection::input() { // 调用 read_hook 读数据到应用层缓冲区
// logger() << "input";
// if(m_state == State::Disconnected) {
// logger() << "input: this conn has already break";
// return;
// }
// while(true) {
// if(m_readBuffer.getWriteable() == 0) {
// m_readBuffer.dilatation();
// }
// int ret = read_hook(m_fdEvent->getFd(), m_readBuffer.getWriteAddress(), m_readBuffer.getWriteable());
// if(ret == -1) {
// logger() << "read_hook ret -1 err:" << strerror(errno);
// // if(errno == EAGAIN) exit(-1);
// break;
// } else if(ret == 0) { // 对端关闭了连接
// clearClient();
// break;
// } else {
// int writeable = m_readBuffer.getWriteable();
// m_readBuffer.writeOffset(ret);
// logger() << "input_hook ret: " << ret;
// if(ret < writeable) { // 读完了结束循环
// break;
// }
// }
// }
// }
// void TcpConnection::output() {
// logger() << "output";
// if(m_state == State::Disconnected) {
// return;
// }
// while(true) {
// if(m_writeBuffer.getReadable() == 0) {
// logger() << "no data need send";
// break;
// }
// int ret = write_hook(m_fdEvent->getFd(), m_writeBuffer.getReadAddress(), m_writeBuffer.getReadable());
// if(ret == -1) {
// logger() << "read_hook ret -1 err:" << strerror(errno);
// break;
// } else if(ret == 0) {
// logger() << "write_hook ret 0";
// break;
// } else {
// m_writeBuffer.readOffset(ret);
// }
// logger() << "write_hook ret: " << ret;
// }
// }
// void TcpConnection::process() {
// logger() << "process";
// // if(m_state == State::Disconnected) {
// // return;
// // }
// // if(m_writeBuffer.getWriteable() < m_readBuffer.getReadable()) {
// // m_writeBuffer.resize(m_readBuffer.getReadable() * 2);
// // }
// // std::memcpy(m_writeBuffer.getWriteAddress(), m_readBuffer.getReadAddress(), m_readBuffer.getReadable());
// // m_writeBuffer.writeOffset(m_readBuffer.getReadable());
// // m_readBuffer.readOffset(m_readBuffer.getReadable());
// // logger() << "write data " << m_writeBuffer.getReadable() << " byte";
// while(m_readBuffer.getReadable() > 0) {
// std::shared_ptr<AbstractData> data(new TinypbData);
// bool ret = m_server.getCoder().decoder(m_readBuffer, *data);
// if(ret == false) {
// logger() << "decode error";
// break;
// }
// if(m_connectType == Type::Server) {
// std::unique_ptr<AbstractData> resp(new TinypbData);
// m_server.getDispatcher().dispatcher(*this, *data, *resp);
// m_server.getCoder().encoder(m_writeBuffer, *resp);
// } else {
// std::shared_ptr<TinypbData> tmp = std::dynamic_pointer_cast<TinypbData>(data);
// m_respond_datas[tmp->msg_req] = data;
// }
// }
// }
// bool TcpConnection::getResPackageData(const std::string& msg_req, std::shared_ptr<AbstractData>& pb_struct) {
// auto it = m_respond_datas.find(msg_req);
// if(it == m_respond_datas.end()) return false;
// pb_struct = it->second;
// m_respond_datas.erase(it);
// return true;
// }
// TcpConnection::~TcpConnection() {
// if(m_state == State::Connected) {
// clearClient();
// }
// logger() << "TcpConnection fd " << m_fdEvent->getFd() << " destructor";
// }
// }

52
test/tcp_connection.hpp Normal file
View File

@ -0,0 +1,52 @@
// #pragma once
// #include "abstract_coder.hpp"
// #include "coroutine.hpp"
// #include "fd_event.hpp"
// #include "reactor.hpp"
// #include "tcp_buffer.hpp"
// #include "tcp_client.hpp"
// #include "tinypb_data.hpp"
// #include <memory>
// #include <unordered_map>
// namespace tinyrpc {
// class TcpServer;
// class TcpConnection {
// public:
// enum class State{
// Disconnected,
// Connected
// };
// enum class Type{
// Server,
// Client
// };
// public:
// TcpConnection(int fd, Reactor& reactor, TcpServer& ser, Type type = Type::Server);
// TcpConnection(int fd, Reactor& reactor, TcpClient& cli, Type type = Type::Client);
// void clearClient();
// void mainLoopFun();
// State getState() {return m_state;}
// bool getResPackageData(const std::string& msg_req, std::shared_ptr<AbstractData>& pb_struct);
// ~TcpConnection();
// private:
// void input();
// void output();
// void process();
// private:
// FdEvent *m_fdEvent;
// Coroutine m_mainCoroutine;
// State m_state{State::Connected};
// TcpBuffer m_writeBuffer{};
// TcpBuffer m_readBuffer{};
// Reactor& m_reactor;
// TcpServer& m_server;
// // TcpClient& m_server;
// Type m_connectType{};
// std::unordered_map<std::string, std::shared_ptr<AbstractData>> m_respond_datas;
// };
// }