tcpConnect 拆分, tcpClient 初步
This commit is contained in:
parent
9d9fb69cbb
commit
77e37f3afc
@ -50,7 +50,6 @@ add_executable(test_tinyrpc
|
||||
# 引入 abseil-cpp 子目录
|
||||
add_subdirectory(./third_party/abseil-cpp absl)
|
||||
|
||||
|
||||
set(ABSEL_LIBARARY
|
||||
absl::absl_check
|
||||
absl::absl_log
|
||||
@ -88,7 +87,6 @@ set(ABSEL_LIBARARY
|
||||
absl::variant
|
||||
)
|
||||
|
||||
|
||||
# 链接库
|
||||
target_link_libraries(tinyrpc PRIVATE protobuf) # 链接 Protobuf 库
|
||||
target_link_libraries(tinyrpc PRIVATE ${ABSEL_LIBARARY}) # 链接 Protobuf 库
|
||||
|
@ -1,7 +1,8 @@
|
||||
#pragma once
|
||||
|
||||
#include "abstract_coder.hpp"
|
||||
#include "tcp_connection.hpp"
|
||||
#include "server_tcp_connect.hpp"
|
||||
|
||||
|
||||
namespace tinyrpc {
|
||||
|
||||
@ -10,7 +11,7 @@ namespace tinyrpc {
|
||||
public:
|
||||
AbstractDispatcher() = default;
|
||||
virtual ~AbstractDispatcher() = default;
|
||||
virtual void dispatcher(TcpConnection& conn, AbstractData& data, AbstractData& respond) = 0;
|
||||
virtual void dispatcher(ServerTcpConnection& conn, AbstractData& data, AbstractData& respond) = 0;
|
||||
|
||||
private:
|
||||
|
||||
|
7
includes/net/protocol_type.hpp
Normal file
7
includes/net/protocol_type.hpp
Normal file
@ -0,0 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
namespace tinyrpc {
|
||||
enum class ProtocolType{
|
||||
Tinypb,
|
||||
};
|
||||
}
|
46
includes/net/tcp/abstract_tcp_connect.hpp
Normal file
46
includes/net/tcp/abstract_tcp_connect.hpp
Normal file
@ -0,0 +1,46 @@
|
||||
#pragma once
|
||||
|
||||
#include "coroutine.hpp"
|
||||
#include "fd_event.hpp"
|
||||
#include "reactor.hpp"
|
||||
#include "tcp_buffer.hpp"
|
||||
|
||||
namespace tinyrpc {
|
||||
class TcpServer;
|
||||
class AbstractTcpConnection {
|
||||
public:
|
||||
enum class State{
|
||||
Disconnected,
|
||||
Connected
|
||||
};
|
||||
// enum class Type{
|
||||
// Server,
|
||||
// Client
|
||||
// };
|
||||
|
||||
public:
|
||||
AbstractTcpConnection(int fd, Reactor& reactor, State state = State::Connected);
|
||||
void clearClient();
|
||||
void mainLoopFun();
|
||||
State getState() {return m_state;}
|
||||
// bool getResPackageData(const std::string& msg_req, std::shared_ptr<AbstractData>& pb_struct); //cli
|
||||
virtual ~AbstractTcpConnection();
|
||||
|
||||
protected:
|
||||
void input();
|
||||
void output();
|
||||
virtual void process() = 0;
|
||||
|
||||
protected:
|
||||
FdEvent *m_fdEvent;
|
||||
Coroutine m_mainCoroutine;
|
||||
State m_state{State::Connected};
|
||||
TcpBuffer m_writeBuffer{};
|
||||
TcpBuffer m_readBuffer{};
|
||||
Reactor& m_reactor;
|
||||
// TcpServer& m_server;
|
||||
// TcpClient& m_server;
|
||||
// std::unordered_map<std::string, std::shared_ptr<AbstractData>> m_respond_datas; // cli
|
||||
};
|
||||
|
||||
}
|
25
includes/net/tcp/client_tcp_connect.hpp
Normal file
25
includes/net/tcp/client_tcp_connect.hpp
Normal file
@ -0,0 +1,25 @@
|
||||
#pragma once
|
||||
|
||||
#include "abstract_coder.hpp"
|
||||
#include "abstract_tcp_connect.hpp"
|
||||
#include <memory>
|
||||
|
||||
|
||||
namespace tinyrpc {
|
||||
class TcpClient;
|
||||
class ClientTcpConnection : AbstractTcpConnection {
|
||||
|
||||
public:
|
||||
ClientTcpConnection(int fd, Reactor& reactor, TcpClient& cli);
|
||||
~ClientTcpConnection();
|
||||
|
||||
private:
|
||||
void process() override;
|
||||
|
||||
private:
|
||||
TcpClient& m_client;
|
||||
std::unordered_map<std::string, std::shared_ptr<AbstractData>> m_respond_datas; // cli
|
||||
|
||||
};
|
||||
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
#include "abstract_tcp_connect.hpp"
|
||||
#include "reactor.hpp"
|
||||
#include "tcp_connection.hpp"
|
||||
#include "server_tcp_connect.hpp"
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <unordered_map>
|
||||
@ -21,7 +22,7 @@ namespace tinyrpc {
|
||||
~IOThread();
|
||||
void mainFunc();
|
||||
private:
|
||||
std::unordered_map<int, std::shared_ptr<TcpConnection>> m_clients;
|
||||
std::unordered_map<int, std::shared_ptr<ServerTcpConnection>> m_clients;
|
||||
std::thread m_thread;
|
||||
Reactor* m_reactor{nullptr};
|
||||
};
|
||||
|
23
includes/net/tcp/server_tcp_connect.hpp
Normal file
23
includes/net/tcp/server_tcp_connect.hpp
Normal file
@ -0,0 +1,23 @@
|
||||
#pragma once
|
||||
|
||||
#include "abstract_tcp_connect.hpp"
|
||||
#include "reactor.hpp"
|
||||
|
||||
|
||||
namespace tinyrpc {
|
||||
class TcpServer;
|
||||
class ServerTcpConnection : AbstractTcpConnection {
|
||||
|
||||
public:
|
||||
ServerTcpConnection(int fd, Reactor& reactor, TcpServer& ser);
|
||||
~ServerTcpConnection();
|
||||
|
||||
private:
|
||||
void process() override;
|
||||
|
||||
private:
|
||||
TcpServer& m_server;
|
||||
|
||||
};
|
||||
|
||||
}
|
23
includes/net/tcp/tcp_client.hpp
Normal file
23
includes/net/tcp/tcp_client.hpp
Normal file
@ -0,0 +1,23 @@
|
||||
#pragma once
|
||||
|
||||
#include "abstract_coder.hpp"
|
||||
#include "client_tcp_connect.hpp"
|
||||
#include "net_address.hpp"
|
||||
#include "reactor.hpp"
|
||||
|
||||
namespace tinyrpc {
|
||||
class TcpClient {
|
||||
public:
|
||||
TcpClient(const NetAddress& peerAddr);
|
||||
AbstractCoder& getCoder() {return *m_coder;}
|
||||
~TcpClient();
|
||||
private:
|
||||
int m_fd{-1};
|
||||
NetAddress m_local_addr{};
|
||||
NetAddress m_peer_addr{};
|
||||
Reactor& m_reactor;
|
||||
ClientTcpConnection *m_connection;
|
||||
AbstractCoder* m_coder{};
|
||||
};
|
||||
|
||||
}
|
@ -1,42 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include "abstract_coder.hpp"
|
||||
#include "coroutine.hpp"
|
||||
#include "fd_event.hpp"
|
||||
#include "reactor.hpp"
|
||||
#include "tcp_buffer.hpp"
|
||||
|
||||
namespace tinyrpc {
|
||||
class TcpServer;
|
||||
class TcpConnection {
|
||||
public:
|
||||
enum class State{
|
||||
Disconnected,
|
||||
Connected
|
||||
};
|
||||
|
||||
|
||||
public:
|
||||
TcpConnection(int fd, Reactor& reactor, TcpServer& ser);
|
||||
void clearClient();
|
||||
void mainLoopFun();
|
||||
|
||||
~TcpConnection();
|
||||
|
||||
private:
|
||||
void input();
|
||||
void output();
|
||||
void process();
|
||||
|
||||
|
||||
private:
|
||||
FdEvent *m_fdEvent;
|
||||
Coroutine m_mainCoroutine;
|
||||
State m_state{State::Connected};
|
||||
TcpBuffer m_writeBuffer{};
|
||||
TcpBuffer m_readBuffer{};
|
||||
Reactor& m_reactor;
|
||||
TcpServer& m_server;
|
||||
};
|
||||
|
||||
}
|
@ -4,7 +4,9 @@
|
||||
#include "coroutine.hpp"
|
||||
#include "io_thread.hpp"
|
||||
#include "net_address.hpp"
|
||||
#include "protocol_type.hpp"
|
||||
#include <cstdint>
|
||||
#include <google/protobuf/service.h>
|
||||
// #include "reactor.hpp"
|
||||
|
||||
namespace tinyrpc {
|
||||
@ -28,6 +30,7 @@ namespace tinyrpc {
|
||||
void start();
|
||||
AbstractCoder& getCoder() {return *m_coder;}
|
||||
AbstractDispatcher& getDispatcher() {return *m_dispatcher;}
|
||||
void registerService(std::shared_ptr<google::protobuf::Service> service);
|
||||
private:
|
||||
void mainAcceptCorFun();
|
||||
private:
|
||||
@ -41,6 +44,7 @@ namespace tinyrpc {
|
||||
IOThreadPool m_ioThreadPool{4};
|
||||
AbstractCoder* m_coder{};
|
||||
AbstractDispatcher* m_dispatcher{};
|
||||
ProtocolType m_protocolType{ProtocolType::Tinypb};
|
||||
};
|
||||
|
||||
}
|
27
includes/net/tinypb/tinypb_closure.hpp
Normal file
27
includes/net/tinypb/tinypb_closure.hpp
Normal file
@ -0,0 +1,27 @@
|
||||
#pragma once
|
||||
|
||||
#include <google/protobuf/stubs/callback.h>
|
||||
#include <functional>
|
||||
|
||||
namespace tinyrpc {
|
||||
|
||||
class TinypbClosure : public google::protobuf::Closure {
|
||||
public:
|
||||
explicit TinypbClosure(const std::function<void()>& cb) : m_callback(cb){}
|
||||
~TinypbClosure() = default;
|
||||
|
||||
void Run() override {
|
||||
if(m_callback) {
|
||||
m_callback();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
std::function<void()> m_callback{};
|
||||
|
||||
|
||||
|
||||
};
|
||||
|
||||
}
|
90
includes/net/tinypb/tinypb_controller.hpp
Normal file
90
includes/net/tinypb/tinypb_controller.hpp
Normal file
@ -0,0 +1,90 @@
|
||||
#pragma once
|
||||
|
||||
#include "net_address.hpp"
|
||||
#include <google/protobuf/service.h>
|
||||
|
||||
namespace tinyrpc {
|
||||
|
||||
class TinypbController : public google::protobuf::RpcController {
|
||||
public:
|
||||
|
||||
|
||||
|
||||
TinypbController() = default;
|
||||
|
||||
~TinypbController() = default;
|
||||
|
||||
void Reset() override{}
|
||||
|
||||
bool Failed() const override{return m_is_failed;}
|
||||
|
||||
|
||||
// Server-side methods ---------------------------------------------
|
||||
|
||||
std::string ErrorText() const override {return m_error_info;}
|
||||
|
||||
void StartCancel() override{}
|
||||
|
||||
void SetFailed(const std::string& reason) override {
|
||||
m_is_failed = true;
|
||||
m_error_info = reason;
|
||||
}
|
||||
|
||||
bool IsCanceled() const override {
|
||||
return m_is_canceled;
|
||||
}
|
||||
|
||||
void NotifyOnCancel(google::protobuf::Closure* callback) override {}
|
||||
// common methods
|
||||
|
||||
int ErrorCode() const {return m_error_code;}
|
||||
|
||||
void SetErrorCode(const int error_code) {m_error_code = error_code;}
|
||||
|
||||
void SetError(const int err_code, const std::string& err_info) {
|
||||
SetFailed(err_info);
|
||||
SetErrorCode(err_code);
|
||||
}
|
||||
|
||||
void SetPeerAddr(const NetAddress& addr) {
|
||||
m_peer_addr = addr;
|
||||
}
|
||||
|
||||
void SetLocalAddr(const NetAddress& addr) {
|
||||
m_local_addr = addr;
|
||||
}
|
||||
|
||||
const NetAddress& PeerAddr() {return m_peer_addr;}
|
||||
|
||||
const NetAddress& LocalAddr(){return m_local_addr;}
|
||||
|
||||
void SetTimeout(const int timeout) {m_timeout = timeout;}
|
||||
|
||||
int Timeout() const {return m_timeout;}
|
||||
|
||||
void SetMsgReq(const std::string& msg_req ) {m_msg_req = msg_req;}
|
||||
void SetMethodName(const std::string& method_name ) {m_method_name = method_name;}
|
||||
void SetFullName(const std::string& full_name ) {m_full_name = full_name;}
|
||||
const std::string GetMsgReq() const {return m_msg_req;}
|
||||
const std::string GetMethodName() const {return m_method_name;}
|
||||
const std::string GetFullName() const {return m_full_name;}
|
||||
|
||||
|
||||
private:
|
||||
int m_error_code {0};
|
||||
bool m_is_failed {false};
|
||||
bool m_is_canceled {false};
|
||||
std::string m_error_info{};
|
||||
std::string m_msg_req{};
|
||||
std::string m_method_name{};
|
||||
std::string m_full_name{};
|
||||
|
||||
int m_timeout {5000};
|
||||
NetAddress m_peer_addr{};
|
||||
NetAddress m_local_addr{};
|
||||
|
||||
|
||||
|
||||
};
|
||||
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "abstract_dispatcher.hpp"
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
#include <google/protobuf/message.h>
|
||||
#include <google/protobuf/service.h>
|
||||
@ -15,10 +16,11 @@ namespace tinyrpc {
|
||||
public:
|
||||
TinypbDispatcher();
|
||||
~TinypbDispatcher();
|
||||
void dispatcher(TcpConnection& conn, AbstractData& data, AbstractData& respond) override;
|
||||
void dispatcher(ServerTcpConnection& conn, AbstractData& data, AbstractData& respond) override;
|
||||
bool parseServiceFullName(const std::string& name, std::string& serviceName, std::string& methodName);
|
||||
void registerService(std::shared_ptr<Service>& service);
|
||||
private:
|
||||
std::unordered_map<std::string, Service*> m_service_map;
|
||||
std::unordered_map<std::string, std::shared_ptr<Service>> m_service_map;
|
||||
|
||||
};
|
||||
|
||||
|
@ -1,24 +1,17 @@
|
||||
#include "tcp_connection.hpp"
|
||||
#include "abstract_coder.hpp"
|
||||
#include "abstract_tcp_connect.hpp"
|
||||
#include "coroutine_hook.hpp"
|
||||
#include "fd_event.hpp"
|
||||
#include "logger.hpp"
|
||||
#include "reactor.hpp"
|
||||
#include "tcp_server.hpp"
|
||||
#include "tinypb_data.hpp"
|
||||
#include <cerrno>
|
||||
#include <cstring>
|
||||
#include <memory>
|
||||
#include <pthread.h>
|
||||
#include <unistd.h>
|
||||
|
||||
|
||||
|
||||
namespace tinyrpc {
|
||||
TcpConnection::TcpConnection(int fd, Reactor& reactor, TcpServer& ser) :
|
||||
AbstractTcpConnection::AbstractTcpConnection(int fd, Reactor& reactor, State state) :
|
||||
m_fdEvent(FdEventPool::getInstance()->getFdEvent(fd)),
|
||||
m_mainCoroutine(std::bind(&TcpConnection::mainLoopFun, this)),
|
||||
m_reactor(reactor),
|
||||
m_server(ser)
|
||||
m_mainCoroutine(std::bind(&AbstractTcpConnection::mainLoopFun, this)),
|
||||
m_state(state),
|
||||
m_reactor(reactor)
|
||||
|
||||
{
|
||||
Reactor::Task task = [this] {
|
||||
logger() << "conn coroutine is resume";
|
||||
@ -29,7 +22,8 @@ namespace tinyrpc {
|
||||
|
||||
}
|
||||
|
||||
void TcpConnection::mainLoopFun() {
|
||||
|
||||
void AbstractTcpConnection::mainLoopFun() {
|
||||
while(m_state == State::Connected) {
|
||||
input();
|
||||
process();
|
||||
@ -38,7 +32,7 @@ namespace tinyrpc {
|
||||
logger() << "this conn loop has already break";
|
||||
}
|
||||
|
||||
void TcpConnection::clearClient() {
|
||||
void AbstractTcpConnection::clearClient() {
|
||||
logger() << "clearClient";
|
||||
m_state = State::Disconnected;
|
||||
m_reactor.delFdEvent(m_fdEvent);
|
||||
@ -47,7 +41,7 @@ namespace tinyrpc {
|
||||
}
|
||||
|
||||
|
||||
void TcpConnection::input() { // 调用 read_hook 读数据到应用层缓冲区
|
||||
void AbstractTcpConnection::input() { // 调用 read_hook 读数据到应用层缓冲区
|
||||
logger() << "input";
|
||||
if(m_state == State::Disconnected) {
|
||||
logger() << "input: this conn has already break";
|
||||
@ -84,7 +78,7 @@ namespace tinyrpc {
|
||||
|
||||
|
||||
}
|
||||
void TcpConnection::output() {
|
||||
void AbstractTcpConnection::output() {
|
||||
logger() << "output";
|
||||
if(m_state == State::Disconnected) {
|
||||
return;
|
||||
@ -113,42 +107,50 @@ namespace tinyrpc {
|
||||
|
||||
}
|
||||
|
||||
void TcpConnection::process() {
|
||||
logger() << "process";
|
||||
// if(m_state == State::Disconnected) {
|
||||
// return;
|
||||
// void AbstractTcpConnection::process() {
|
||||
// logger() << "process";
|
||||
|
||||
// // if(m_state == State::Disconnected) {
|
||||
// // return;
|
||||
// // }
|
||||
// // if(m_writeBuffer.getWriteable() < m_readBuffer.getReadable()) {
|
||||
// // m_writeBuffer.resize(m_readBuffer.getReadable() * 2);
|
||||
// // }
|
||||
// // std::memcpy(m_writeBuffer.getWriteAddress(), m_readBuffer.getReadAddress(), m_readBuffer.getReadable());
|
||||
// // m_writeBuffer.writeOffset(m_readBuffer.getReadable());
|
||||
// // m_readBuffer.readOffset(m_readBuffer.getReadable());
|
||||
|
||||
// // logger() << "write data " << m_writeBuffer.getReadable() << " byte";
|
||||
|
||||
// while(m_readBuffer.getReadable() > 0) {
|
||||
// std::shared_ptr<AbstractData> data(new TinypbData);
|
||||
|
||||
// bool ret = m_server.getCoder().decoder(m_readBuffer, *data);
|
||||
// if(ret == false) {
|
||||
// logger() << "decode error";
|
||||
// break;
|
||||
// }
|
||||
// if(m_writeBuffer.getWriteable() < m_readBuffer.getReadable()) {
|
||||
// m_writeBuffer.resize(m_readBuffer.getReadable() * 2);
|
||||
// if(m_connectType == Type::Server) {
|
||||
// std::unique_ptr<AbstractData> resp(new TinypbData);
|
||||
// m_server.getDispatcher().dispatcher(*this, *data, *resp);
|
||||
// m_server.getCoder().encoder(m_writeBuffer, *resp);
|
||||
// } else {
|
||||
// std::shared_ptr<TinypbData> tmp = std::dynamic_pointer_cast<TinypbData>(data);
|
||||
// m_respond_datas[tmp->msg_req] = data;
|
||||
// }
|
||||
// std::memcpy(m_writeBuffer.getWriteAddress(), m_readBuffer.getReadAddress(), m_readBuffer.getReadable());
|
||||
// m_writeBuffer.writeOffset(m_readBuffer.getReadable());
|
||||
// m_readBuffer.readOffset(m_readBuffer.getReadable());
|
||||
|
||||
// logger() << "write data " << m_writeBuffer.getReadable() << " byte";
|
||||
|
||||
while(m_readBuffer.getReadable() > 0) {
|
||||
std::unique_ptr<AbstractData> data(new TinypbData);
|
||||
// }
|
||||
|
||||
bool ret = m_server.getCoder().decoder(m_readBuffer, *data);
|
||||
if(ret == false) {
|
||||
logger() << "decode error";
|
||||
break;
|
||||
}
|
||||
std::unique_ptr<AbstractData> resp(new TinypbData);
|
||||
m_server.getDispatcher().dispatcher(*this, *data, *resp);
|
||||
// }
|
||||
|
||||
m_server.getCoder().encoder(m_writeBuffer, *resp);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
TcpConnection::~TcpConnection() {
|
||||
AbstractTcpConnection::~AbstractTcpConnection() {
|
||||
if(m_state == State::Connected) {
|
||||
clearClient();
|
||||
}
|
||||
logger() << "TcpConnection fd " << m_fdEvent->getFd() << " destructor";
|
||||
|
||||
logger() << "AbstractTcpConnection fd " << m_fdEvent->getFd() << " destructor";
|
||||
}
|
||||
|
||||
}
|
||||
|
38
src/net/tcp/client_tcp_connect.cc
Normal file
38
src/net/tcp/client_tcp_connect.cc
Normal file
@ -0,0 +1,38 @@
|
||||
#include "client_tcp_connect.hpp"
|
||||
#include "abstract_tcp_connect.hpp"
|
||||
#include "logger.hpp"
|
||||
#include "tcp_client.hpp"
|
||||
#include "tinypb_data.hpp"
|
||||
#include <memory>
|
||||
|
||||
|
||||
|
||||
namespace tinyrpc {
|
||||
ClientTcpConnection::ClientTcpConnection(int fd, Reactor& reactor, TcpClient& cli) :
|
||||
AbstractTcpConnection(fd, reactor),
|
||||
m_client(cli)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
ClientTcpConnection::~ClientTcpConnection() {
|
||||
|
||||
}
|
||||
|
||||
void ClientTcpConnection::process() {
|
||||
while(m_readBuffer.getReadable() > 0) {
|
||||
std::shared_ptr<AbstractData> data(new TinypbData);
|
||||
|
||||
bool ret = m_client.getCoder().decoder(m_readBuffer, *data);
|
||||
|
||||
if(ret == false) {
|
||||
logger() << "decode error";
|
||||
break;
|
||||
}
|
||||
|
||||
std::shared_ptr<TinypbData> tmp = std::dynamic_pointer_cast<TinypbData>(data);
|
||||
m_respond_datas[tmp->msg_req] = data;
|
||||
|
||||
}
|
||||
}
|
||||
}
|
@ -2,7 +2,7 @@
|
||||
#include "logger.hpp"
|
||||
#include "reactor.hpp"
|
||||
#include "coroutine.hpp"
|
||||
#include "tcp_connection.hpp"
|
||||
#include "server_tcp_connect.hpp"
|
||||
#include "tcp_server.hpp"
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
@ -26,7 +26,7 @@ namespace tinyrpc {
|
||||
|
||||
void IOThread::addClient(TcpServer* ser, int fd) {
|
||||
|
||||
m_clients[fd] = std::make_shared<TcpConnection>(fd, *m_reactor, *ser);
|
||||
m_clients[fd] = std::shared_ptr<ServerTcpConnection>(new ServerTcpConnection(fd, *m_reactor, *ser));
|
||||
|
||||
}
|
||||
|
||||
|
35
src/net/tcp/server_tcp_connect.cc
Normal file
35
src/net/tcp/server_tcp_connect.cc
Normal file
@ -0,0 +1,35 @@
|
||||
#include "server_tcp_connect.hpp"
|
||||
#include "abstract_tcp_connect.hpp"
|
||||
#include "logger.hpp"
|
||||
#include "tcp_server.hpp"
|
||||
#include "tinypb_data.hpp"
|
||||
|
||||
|
||||
namespace tinyrpc {
|
||||
ServerTcpConnection::ServerTcpConnection(int fd, Reactor& reactor, TcpServer& ser) :
|
||||
AbstractTcpConnection(fd, reactor),
|
||||
m_server(ser)
|
||||
{
|
||||
|
||||
}
|
||||
ServerTcpConnection::~ServerTcpConnection() {
|
||||
|
||||
}
|
||||
void ServerTcpConnection::process() {
|
||||
while(m_readBuffer.getReadable() > 0) {
|
||||
std::shared_ptr<AbstractData> data(new TinypbData);
|
||||
|
||||
bool ret = m_server.getCoder().decoder(m_readBuffer, *data);
|
||||
|
||||
if(ret == false) {
|
||||
logger() << "decode error";
|
||||
break;
|
||||
}
|
||||
|
||||
std::unique_ptr<AbstractData> resp(new TinypbData);
|
||||
m_server.getDispatcher().dispatcher(*this, *data, *resp);
|
||||
m_server.getCoder().encoder(m_writeBuffer, *resp);
|
||||
|
||||
}
|
||||
}
|
||||
}
|
35
src/net/tcp/tcp_client.cc
Normal file
35
src/net/tcp/tcp_client.cc
Normal file
@ -0,0 +1,35 @@
|
||||
#include "tcp_client.hpp"
|
||||
#include "client_tcp_connect.hpp"
|
||||
#include "fd_event.hpp"
|
||||
#include "logger.hpp"
|
||||
#include "net_address.hpp"
|
||||
#include "reactor.hpp"
|
||||
#include "tinypb_coder.hpp"
|
||||
#include <unistd.h>
|
||||
|
||||
|
||||
namespace tinyrpc {
|
||||
|
||||
TcpClient::TcpClient(const NetAddress& peerAddr) :
|
||||
m_peer_addr(peerAddr),
|
||||
m_reactor(*Reactor::getReactor())
|
||||
|
||||
{
|
||||
m_fd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (m_fd == -1) {
|
||||
logger() << "call socket error, fd=-1, sys error=" << strerror(errno);
|
||||
}
|
||||
m_local_addr = NetAddress("127.0.0.1", 0);
|
||||
m_coder = new TinypbCoder();
|
||||
m_connection = new ClientTcpConnection(m_fd, m_reactor, *this);
|
||||
|
||||
}
|
||||
|
||||
TcpClient::~TcpClient() {
|
||||
m_reactor.delFdEvent(FdEventPool::getInstance()->getFdEvent(m_fd));
|
||||
if(m_fd != -1) close(m_fd);
|
||||
delete m_coder;
|
||||
delete m_connection;
|
||||
}
|
||||
|
||||
}
|
@ -4,6 +4,7 @@
|
||||
#include "logger.hpp"
|
||||
#include "coroutine_hook.hpp"
|
||||
#include "net_address.hpp"
|
||||
#include "protocol_type.hpp"
|
||||
#include "reactor.hpp"
|
||||
#include "tinypb_coder.hpp"
|
||||
#include "tinypb_dispatcher.hpp"
|
||||
@ -103,4 +104,12 @@ namespace tinyrpc {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void TcpServer::registerService(std::shared_ptr<google::protobuf::Service> service) {
|
||||
if(m_protocolType != ProtocolType::Tinypb) return;
|
||||
if(service) {
|
||||
dynamic_cast<TinypbDispatcher*>(m_dispatcher)->registerService(service);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -1,8 +1,9 @@
|
||||
#include "tinypb_dispatcher.hpp"
|
||||
#include "logger.hpp"
|
||||
#include "tinypb_closure.hpp"
|
||||
#include "tinypb_controller.hpp"
|
||||
#include "tinypb_data.hpp"
|
||||
#include "error_code.hpp"
|
||||
#include <cstddef>
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
|
||||
@ -12,16 +13,22 @@ namespace tinyrpc {
|
||||
TinypbDispatcher::~TinypbDispatcher() {
|
||||
|
||||
}
|
||||
void TinypbDispatcher::dispatcher(TcpConnection& conn, AbstractData& data, AbstractData& resp) {
|
||||
void TinypbDispatcher::dispatcher(ServerTcpConnection& conn, AbstractData& data, AbstractData& resp) {
|
||||
logger() << "dispatcher";
|
||||
TinypbData& pbdata = dynamic_cast<TinypbData&>(data);
|
||||
TinypbData& request = dynamic_cast<TinypbData&>(data);
|
||||
TinypbData& respond = dynamic_cast<TinypbData&>(resp);
|
||||
|
||||
std::string service_name;
|
||||
std::string method_name;
|
||||
|
||||
respond.service_full_name = pbdata.service_full_name;
|
||||
respond.msg_req = pbdata.msg_req;
|
||||
bool ret = parseServiceFullName(pbdata.service_full_name, service_name, method_name);
|
||||
respond.service_full_name = request.service_full_name;
|
||||
respond.msg_req = request.msg_req;
|
||||
|
||||
logger() << "request service_full_name:" << "[" << request.service_full_name << "]";
|
||||
logger() << "request msg_req:" << "[" << request.msg_req << "]";
|
||||
|
||||
bool ret = parseServiceFullName(request.service_full_name, service_name, method_name);
|
||||
|
||||
if(ret == false) {
|
||||
respond.err_code = ERROR_PARSE_SERVICE_NAME;
|
||||
std::stringstream ss;
|
||||
@ -29,6 +36,7 @@ namespace tinyrpc {
|
||||
respond.err_info = ss.str();
|
||||
return;
|
||||
}
|
||||
logger() << "request method_name:" << "[" << method_name << "]";
|
||||
|
||||
auto it = m_service_map.find(service_name);
|
||||
|
||||
@ -41,7 +49,7 @@ namespace tinyrpc {
|
||||
|
||||
}
|
||||
|
||||
Service* service = it->second;
|
||||
std::shared_ptr<Service> service = it->second;
|
||||
|
||||
const Method* method = service->GetDescriptor()->FindMethodByName(method_name);
|
||||
// const Method* method = nullptr;
|
||||
@ -54,7 +62,7 @@ namespace tinyrpc {
|
||||
}
|
||||
std::unique_ptr<Message> requestMsg (service->GetRequestPrototype(method).New());
|
||||
|
||||
ret = requestMsg->ParseFromString(pbdata.pb_data);
|
||||
ret = requestMsg->ParseFromString(request.pb_data);
|
||||
|
||||
if(ret == false) {
|
||||
respond.err_code = ERROR_FAILED_SERIALIZE;
|
||||
@ -77,7 +85,13 @@ namespace tinyrpc {
|
||||
}
|
||||
};
|
||||
|
||||
service->CallMethod(method, nullptr, requestMsg.get(), respondMsg.get(), nullptr /* callback */);
|
||||
std::unique_ptr<TinypbController> rpcController(new TinypbController);
|
||||
rpcController->SetMsgReq(respond.msg_req);
|
||||
rpcController->SetFullName(respond.service_full_name);
|
||||
rpcController->SetMethodName(method_name);
|
||||
TinypbClosure done(callback);
|
||||
|
||||
service->CallMethod(method, rpcController.get(), requestMsg.get(), respondMsg.get(), &done /* callback */);
|
||||
|
||||
}
|
||||
|
||||
@ -89,10 +103,16 @@ namespace tinyrpc {
|
||||
serviceName = name.substr(0, pos);
|
||||
methodName = name.substr(pos + 1);
|
||||
|
||||
logger() << "serviceName=" << serviceName;
|
||||
logger() << "methodName=" << methodName;
|
||||
// logger() << "serviceName=" << serviceName;
|
||||
// logger() << "methodName=" << methodName;
|
||||
return true;
|
||||
}
|
||||
|
||||
void TinypbDispatcher::registerService(std::shared_ptr<Service>& service) {
|
||||
std::string service_name = service->GetDescriptor()->full_name();
|
||||
m_service_map[service_name] = service;
|
||||
logger() << "success register service:" << "[" << service_name << "]";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
22
test/servertest/clienttest.cc
Normal file
22
test/servertest/clienttest.cc
Normal file
@ -0,0 +1,22 @@
|
||||
#include <iostream>
|
||||
#include "reactor.hpp"
|
||||
#include "tcp_server.hpp"
|
||||
using namespace std;
|
||||
using namespace tinyrpc;
|
||||
void test() {
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
int main() {
|
||||
TcpServer server;
|
||||
|
||||
// server.
|
||||
Reactor::getReactor()->addTask(test);
|
||||
server.start();
|
||||
|
||||
return 0;
|
||||
}
|
51
test/servertest/servertest.cc
Normal file
51
test/servertest/servertest.cc
Normal file
@ -0,0 +1,51 @@
|
||||
#include "logger.hpp"
|
||||
#include "tcp_server.hpp"
|
||||
#include "tinypb.pb.h"
|
||||
#include <google/protobuf/service.h>
|
||||
#include <iostream>
|
||||
using namespace std;
|
||||
using namespace tinyrpc;
|
||||
|
||||
class QueryServiceImpl : public QueryService {
|
||||
public:
|
||||
QueryServiceImpl() { }
|
||||
~QueryServiceImpl() { }
|
||||
|
||||
void query_name(google::protobuf::RpcController* controller,
|
||||
const ::queryNameReq* request,
|
||||
::queryNameRes* response,
|
||||
::google::protobuf::Closure* done) override
|
||||
{
|
||||
logger() << "QueryServiceImpl.query_name, req={" << request->ShortDebugString() << "}";
|
||||
response->set_ret_code(0);
|
||||
response->set_res_info("OK");
|
||||
response->set_req_no(request->req_no());
|
||||
response->set_id(request->id());
|
||||
response->set_name("yyy");
|
||||
done->Run();
|
||||
}
|
||||
|
||||
void query_age(google::protobuf::RpcController* controller,
|
||||
const ::queryAgeReq* request,
|
||||
::queryAgeRes* response,
|
||||
::google::protobuf::Closure* done) override
|
||||
{
|
||||
logger() << "QueryServiceImpl.query_name, req={" << request->ShortDebugString() << "}";
|
||||
response->set_ret_code(0);
|
||||
response->set_res_info("OK");
|
||||
response->set_req_no(request->req_no());
|
||||
response->set_id(request->id());
|
||||
response->set_age(20);
|
||||
done->Run();
|
||||
}
|
||||
|
||||
private:
|
||||
};
|
||||
|
||||
int main()
|
||||
{
|
||||
TcpServer server("0.0.0.0", 9001);
|
||||
server.registerService(std::make_shared<QueryServiceImpl>());
|
||||
server.start();
|
||||
return 0;
|
||||
}
|
1439
test/servertest/tinypb.pb.cc
Normal file
1439
test/servertest/tinypb.pb.cc
Normal file
File diff suppressed because it is too large
Load Diff
1383
test/servertest/tinypb.pb.h
Normal file
1383
test/servertest/tinypb.pb.h
Normal file
File diff suppressed because it is too large
Load Diff
38
test/servertest/tinypb.proto
Normal file
38
test/servertest/tinypb.proto
Normal file
@ -0,0 +1,38 @@
|
||||
syntax = "proto3";
|
||||
option cc_generic_services = true;
|
||||
|
||||
message queryAgeReq {
|
||||
int32 req_no = 1;
|
||||
int32 id = 2;
|
||||
}
|
||||
|
||||
message queryAgeRes {
|
||||
int32 ret_code = 1;
|
||||
string res_info = 2;
|
||||
int32 req_no = 3;
|
||||
int32 id = 4;
|
||||
int32 age = 5;
|
||||
}
|
||||
|
||||
message queryNameReq {
|
||||
int32 req_no = 1;
|
||||
int32 id = 2;
|
||||
int32 type = 3;
|
||||
}
|
||||
|
||||
message queryNameRes {
|
||||
int32 ret_code = 1;
|
||||
string res_info = 2;
|
||||
int32 req_no = 3;
|
||||
int32 id = 4;
|
||||
string name = 5;
|
||||
}
|
||||
|
||||
|
||||
service QueryService {
|
||||
// rpc method name
|
||||
rpc query_name(queryNameReq) returns (queryNameRes);
|
||||
|
||||
// rpc method name
|
||||
rpc query_age(queryAgeReq) returns (queryAgeRes);
|
||||
}
|
173
test/tcp_connection.cc
Normal file
173
test/tcp_connection.cc
Normal file
@ -0,0 +1,173 @@
|
||||
// #include "tcp_connection.hpp"
|
||||
// #include "abstract_coder.hpp"
|
||||
// #include "coroutine_hook.hpp"
|
||||
// #include "fd_event.hpp"
|
||||
// #include "logger.hpp"
|
||||
// #include "reactor.hpp"
|
||||
// #include "tcp_client.hpp"
|
||||
// #include "tcp_server.hpp"
|
||||
// #include "tinypb_data.hpp"
|
||||
// #include <cerrno>
|
||||
// #include <cstring>
|
||||
// #include <memory>
|
||||
// #include <pthread.h>
|
||||
// #include <unistd.h>
|
||||
|
||||
|
||||
// namespace tinyrpc {
|
||||
// TcpConnection::TcpConnection(int fd, Reactor& reactor, TcpServer& ser, Type type) :
|
||||
// m_fdEvent(FdEventPool::getInstance()->getFdEvent(fd)),
|
||||
// m_mainCoroutine(std::bind(&TcpConnection::mainLoopFun, this)),
|
||||
// m_reactor(reactor),
|
||||
// m_server(ser),
|
||||
// m_connectType(type)
|
||||
// {
|
||||
// Reactor::Task task = [this] {
|
||||
// logger() << "conn coroutine is resume";
|
||||
// m_mainCoroutine.resume();
|
||||
// };
|
||||
|
||||
// reactor.addTask(task, true);
|
||||
|
||||
// }
|
||||
|
||||
|
||||
|
||||
// void TcpConnection::mainLoopFun() {
|
||||
// while(m_state == State::Connected) {
|
||||
// input();
|
||||
// process();
|
||||
// output();
|
||||
// }
|
||||
// logger() << "this conn loop has already break";
|
||||
// }
|
||||
|
||||
// void TcpConnection::clearClient() {
|
||||
// logger() << "clearClient";
|
||||
// m_state = State::Disconnected;
|
||||
// m_reactor.delFdEvent(m_fdEvent);
|
||||
// m_fdEvent->reset();
|
||||
// close(m_fdEvent->getFd());
|
||||
// }
|
||||
|
||||
|
||||
// void TcpConnection::input() { // 调用 read_hook 读数据到应用层缓冲区
|
||||
// logger() << "input";
|
||||
// if(m_state == State::Disconnected) {
|
||||
// logger() << "input: this conn has already break";
|
||||
// return;
|
||||
// }
|
||||
|
||||
|
||||
// while(true) {
|
||||
// if(m_readBuffer.getWriteable() == 0) {
|
||||
// m_readBuffer.dilatation();
|
||||
// }
|
||||
|
||||
// int ret = read_hook(m_fdEvent->getFd(), m_readBuffer.getWriteAddress(), m_readBuffer.getWriteable());
|
||||
// if(ret == -1) {
|
||||
// logger() << "read_hook ret -1 err:" << strerror(errno);
|
||||
// // if(errno == EAGAIN) exit(-1);
|
||||
// break;
|
||||
|
||||
// } else if(ret == 0) { // 对端关闭了连接
|
||||
// clearClient();
|
||||
// break;
|
||||
// } else {
|
||||
// int writeable = m_readBuffer.getWriteable();
|
||||
// m_readBuffer.writeOffset(ret);
|
||||
// logger() << "input_hook ret: " << ret;
|
||||
|
||||
// if(ret < writeable) { // 读完了结束循环
|
||||
// break;
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
|
||||
// }
|
||||
// void TcpConnection::output() {
|
||||
// logger() << "output";
|
||||
// if(m_state == State::Disconnected) {
|
||||
// return;
|
||||
// }
|
||||
|
||||
// while(true) {
|
||||
// if(m_writeBuffer.getReadable() == 0) {
|
||||
// logger() << "no data need send";
|
||||
// break;
|
||||
// }
|
||||
|
||||
// int ret = write_hook(m_fdEvent->getFd(), m_writeBuffer.getReadAddress(), m_writeBuffer.getReadable());
|
||||
// if(ret == -1) {
|
||||
// logger() << "read_hook ret -1 err:" << strerror(errno);
|
||||
// break;
|
||||
|
||||
// } else if(ret == 0) {
|
||||
// logger() << "write_hook ret 0";
|
||||
// break;
|
||||
// } else {
|
||||
// m_writeBuffer.readOffset(ret);
|
||||
// }
|
||||
// logger() << "write_hook ret: " << ret;
|
||||
// }
|
||||
|
||||
|
||||
// }
|
||||
|
||||
// void TcpConnection::process() {
|
||||
// logger() << "process";
|
||||
|
||||
// // if(m_state == State::Disconnected) {
|
||||
// // return;
|
||||
// // }
|
||||
// // if(m_writeBuffer.getWriteable() < m_readBuffer.getReadable()) {
|
||||
// // m_writeBuffer.resize(m_readBuffer.getReadable() * 2);
|
||||
// // }
|
||||
// // std::memcpy(m_writeBuffer.getWriteAddress(), m_readBuffer.getReadAddress(), m_readBuffer.getReadable());
|
||||
// // m_writeBuffer.writeOffset(m_readBuffer.getReadable());
|
||||
// // m_readBuffer.readOffset(m_readBuffer.getReadable());
|
||||
|
||||
// // logger() << "write data " << m_writeBuffer.getReadable() << " byte";
|
||||
|
||||
// while(m_readBuffer.getReadable() > 0) {
|
||||
// std::shared_ptr<AbstractData> data(new TinypbData);
|
||||
|
||||
// bool ret = m_server.getCoder().decoder(m_readBuffer, *data);
|
||||
// if(ret == false) {
|
||||
// logger() << "decode error";
|
||||
// break;
|
||||
// }
|
||||
// if(m_connectType == Type::Server) {
|
||||
// std::unique_ptr<AbstractData> resp(new TinypbData);
|
||||
// m_server.getDispatcher().dispatcher(*this, *data, *resp);
|
||||
// m_server.getCoder().encoder(m_writeBuffer, *resp);
|
||||
// } else {
|
||||
// std::shared_ptr<TinypbData> tmp = std::dynamic_pointer_cast<TinypbData>(data);
|
||||
// m_respond_datas[tmp->msg_req] = data;
|
||||
// }
|
||||
|
||||
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
// bool TcpConnection::getResPackageData(const std::string& msg_req, std::shared_ptr<AbstractData>& pb_struct) {
|
||||
// auto it = m_respond_datas.find(msg_req);
|
||||
// if(it == m_respond_datas.end()) return false;
|
||||
// pb_struct = it->second;
|
||||
// m_respond_datas.erase(it);
|
||||
// return true;
|
||||
// }
|
||||
|
||||
// TcpConnection::~TcpConnection() {
|
||||
// if(m_state == State::Connected) {
|
||||
// clearClient();
|
||||
// }
|
||||
|
||||
// logger() << "TcpConnection fd " << m_fdEvent->getFd() << " destructor";
|
||||
// }
|
||||
|
||||
// }
|
52
test/tcp_connection.hpp
Normal file
52
test/tcp_connection.hpp
Normal file
@ -0,0 +1,52 @@
|
||||
// #pragma once
|
||||
// #include "abstract_coder.hpp"
|
||||
// #include "coroutine.hpp"
|
||||
// #include "fd_event.hpp"
|
||||
// #include "reactor.hpp"
|
||||
// #include "tcp_buffer.hpp"
|
||||
// #include "tcp_client.hpp"
|
||||
// #include "tinypb_data.hpp"
|
||||
// #include <memory>
|
||||
// #include <unordered_map>
|
||||
|
||||
// namespace tinyrpc {
|
||||
// class TcpServer;
|
||||
// class TcpConnection {
|
||||
// public:
|
||||
// enum class State{
|
||||
// Disconnected,
|
||||
// Connected
|
||||
// };
|
||||
// enum class Type{
|
||||
// Server,
|
||||
// Client
|
||||
// };
|
||||
|
||||
// public:
|
||||
// TcpConnection(int fd, Reactor& reactor, TcpServer& ser, Type type = Type::Server);
|
||||
// TcpConnection(int fd, Reactor& reactor, TcpClient& cli, Type type = Type::Client);
|
||||
// void clearClient();
|
||||
// void mainLoopFun();
|
||||
// State getState() {return m_state;}
|
||||
// bool getResPackageData(const std::string& msg_req, std::shared_ptr<AbstractData>& pb_struct);
|
||||
// ~TcpConnection();
|
||||
|
||||
// private:
|
||||
// void input();
|
||||
// void output();
|
||||
// void process();
|
||||
|
||||
// private:
|
||||
// FdEvent *m_fdEvent;
|
||||
// Coroutine m_mainCoroutine;
|
||||
// State m_state{State::Connected};
|
||||
// TcpBuffer m_writeBuffer{};
|
||||
// TcpBuffer m_readBuffer{};
|
||||
// Reactor& m_reactor;
|
||||
// TcpServer& m_server;
|
||||
// // TcpClient& m_server;
|
||||
// Type m_connectType{};
|
||||
// std::unordered_map<std::string, std::shared_ptr<AbstractData>> m_respond_datas;
|
||||
// };
|
||||
|
||||
// }
|
Loading…
Reference in New Issue
Block a user