初步完成rpc功能
This commit is contained in:
parent
29c4f49f9e
commit
43c0ef3507
4
.gitignore
vendored
4
.gitignore
vendored
@ -1,5 +1,5 @@
|
|||||||
*.pb.cc
|
tinypb.pb.cc
|
||||||
*.pb.cc
|
tinypb.pb.h
|
||||||
bin/
|
bin/
|
||||||
build/
|
build/
|
||||||
lib/
|
lib/
|
||||||
|
@ -40,13 +40,6 @@ add_library(tinyrpc
|
|||||||
${ASM_FILES}
|
${ASM_FILES}
|
||||||
)
|
)
|
||||||
|
|
||||||
aux_source_directory(${CMAKE_SOURCE_DIR}/test/returntest TEST_SRC_LIST)
|
|
||||||
|
|
||||||
|
|
||||||
add_executable(test_tinyrpc
|
|
||||||
${TEST_SRC_LIST}
|
|
||||||
)
|
|
||||||
|
|
||||||
# 引入 abseil-cpp 子目录
|
# 引入 abseil-cpp 子目录
|
||||||
add_subdirectory(./third_party/abseil-cpp absl)
|
add_subdirectory(./third_party/abseil-cpp absl)
|
||||||
|
|
||||||
@ -86,12 +79,31 @@ set(ABSEL_LIBARARY
|
|||||||
absl::utility
|
absl::utility
|
||||||
absl::variant
|
absl::variant
|
||||||
)
|
)
|
||||||
|
|
||||||
# 链接库
|
# 链接库
|
||||||
target_link_libraries(tinyrpc PRIVATE protobuf) # 链接 Protobuf 库
|
target_link_libraries(tinyrpc PRIVATE protobuf) # 链接 Protobuf 库
|
||||||
target_link_libraries(tinyrpc PRIVATE ${ABSEL_LIBARARY}) # 链接 Protobuf 库
|
target_link_libraries(tinyrpc PRIVATE ${ABSEL_LIBARARY}) # 链接 Protobuf 库
|
||||||
|
|
||||||
|
|
||||||
|
aux_source_directory(${CMAKE_SOURCE_DIR}/test/servertest SER_TEST_SRC_LIST)
|
||||||
|
aux_source_directory(${CMAKE_SOURCE_DIR}/test/clienttest CLI_TEST_SRC_LIST)
|
||||||
|
|
||||||
|
|
||||||
|
add_executable(test_tinyrpc_server
|
||||||
|
${SER_TEST_SRC_LIST}
|
||||||
|
)
|
||||||
|
add_executable(test_tinyrpc_client
|
||||||
|
${CLI_TEST_SRC_LIST}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
target_link_libraries(test_tinyrpc_server PRIVATE tinyrpc)
|
||||||
|
target_link_libraries(test_tinyrpc_client PRIVATE tinyrpc)
|
||||||
|
|
||||||
|
|
||||||
|
aux_source_directory(${CMAKE_SOURCE_DIR}/test/codertest TEST_SRC_LIST)
|
||||||
|
|
||||||
|
add_executable(test_tinyrpc
|
||||||
|
${TEST_SRC_LIST}
|
||||||
|
)
|
||||||
|
|
||||||
target_link_libraries(test_tinyrpc PRIVATE tinyrpc)
|
target_link_libraries(test_tinyrpc PRIVATE tinyrpc)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -8,17 +8,16 @@ namespace tinyrpc {
|
|||||||
friend void coFunction(Coroutine* co);
|
friend void coFunction(Coroutine* co);
|
||||||
private:
|
private:
|
||||||
Coroutine();
|
Coroutine();
|
||||||
|
void operator()() const { // 调用 这个协程的回调
|
||||||
|
m_callback();
|
||||||
|
}
|
||||||
public:
|
public:
|
||||||
// Coroutine(std::size_t stack_size, char* stack_sp);
|
// Coroutine(std::size_t stack_size, char* stack_sp);
|
||||||
|
|
||||||
Coroutine(std::function<void()> cb, std::size_t stack_size = 1 * 1024 * 1024/* , char* stack_sp */);
|
Coroutine(std::function<void()> cb, std::size_t stack_size = 1 * 1024 * 1024/* , char* stack_sp */);
|
||||||
|
Coroutine(const Coroutine&) = delete;
|
||||||
// int getCorID() const {return m_cor_id;}
|
// int getCorID() const {return m_cor_id;}
|
||||||
|
|
||||||
void operator()() const { // 调用 这个协程的回调
|
|
||||||
m_callback();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool isMainCoroutine() const {return m_stack_sp == nullptr;}
|
bool isMainCoroutine() const {return m_stack_sp == nullptr;}
|
||||||
|
|
||||||
// coctx* getContext() {return &m_ctx;}
|
// coctx* getContext() {return &m_ctx;}
|
||||||
|
@ -18,7 +18,7 @@ namespace tinyrpc {
|
|||||||
AbstractCoder() = default;
|
AbstractCoder() = default;
|
||||||
virtual ~AbstractCoder() = default;
|
virtual ~AbstractCoder() = default;
|
||||||
|
|
||||||
virtual bool encoder(TcpBuffer& buffer, AbstractData& data) = 0; // 编码
|
virtual bool encoder(TcpBuffer& buffer, const AbstractData& data) = 0; // 编码
|
||||||
|
|
||||||
virtual bool decoder(TcpBuffer& buffer, AbstractData& data) = 0; // 解码
|
virtual bool decoder(TcpBuffer& buffer, AbstractData& data) = 0; // 解码
|
||||||
|
|
||||||
|
@ -3,7 +3,8 @@
|
|||||||
namespace tinyrpc {
|
namespace tinyrpc {
|
||||||
|
|
||||||
enum ErrorCode {
|
enum ErrorCode {
|
||||||
ERROR_PEER_CLOSED, // connect when peer close
|
|
||||||
|
ERROR_PEER_CLOSED = 1, // connect when peer close
|
||||||
ERROR_FAILED_CONNECT, // failed to connection peer host
|
ERROR_FAILED_CONNECT, // failed to connection peer host
|
||||||
ERROR_FAILED_GET_REPLY, // failed to get server reply
|
ERROR_FAILED_GET_REPLY, // failed to get server reply
|
||||||
ERROR_FAILED_DESERIALIZE, // deserialize failed
|
ERROR_FAILED_DESERIALIZE, // deserialize failed
|
||||||
@ -19,7 +20,7 @@ enum ErrorCode {
|
|||||||
ERROR_METHOD_NOT_FOUND, // not found method
|
ERROR_METHOD_NOT_FOUND, // not found method
|
||||||
|
|
||||||
ERROR_PARSE_SERVICE_NAME, // not found service name
|
ERROR_PARSE_SERVICE_NAME, // not found service name
|
||||||
ERROR_ASYNC_RPC_CALL_SINGLE_IOTHREAD, // not supoort async rpc call when only have single iothread
|
ERROR_ASYNC_RPC_CALL_SINGLE_IOTHREAD = 12, // not supoort async rpc call when only have single iothread
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -25,8 +25,9 @@ namespace tinyrpc {
|
|||||||
State getState() {return m_state;}
|
State getState() {return m_state;}
|
||||||
// bool getResPackageData(const std::string& msg_req, std::shared_ptr<AbstractData>& pb_struct); //cli
|
// bool getResPackageData(const std::string& msg_req, std::shared_ptr<AbstractData>& pb_struct); //cli
|
||||||
virtual ~AbstractTcpConnection();
|
virtual ~AbstractTcpConnection();
|
||||||
|
void addMainTaskToReactor();
|
||||||
|
|
||||||
protected:
|
public:
|
||||||
void input();
|
void input();
|
||||||
void output();
|
void output();
|
||||||
virtual void process() = 0;
|
virtual void process() = 0;
|
||||||
|
@ -2,18 +2,19 @@
|
|||||||
|
|
||||||
#include "abstract_coder.hpp"
|
#include "abstract_coder.hpp"
|
||||||
#include "abstract_tcp_connect.hpp"
|
#include "abstract_tcp_connect.hpp"
|
||||||
|
#include "tcp_buffer.hpp"
|
||||||
#include <memory>
|
#include <memory>
|
||||||
|
|
||||||
|
|
||||||
namespace tinyrpc {
|
namespace tinyrpc {
|
||||||
class TcpClient;
|
class TcpClient;
|
||||||
class ClientTcpConnection : AbstractTcpConnection {
|
class ClientTcpConnection : public AbstractTcpConnection {
|
||||||
|
|
||||||
public:
|
public:
|
||||||
ClientTcpConnection(int fd, Reactor& reactor, TcpClient& cli);
|
ClientTcpConnection(int fd, Reactor& reactor, TcpClient& cli);
|
||||||
~ClientTcpConnection();
|
~ClientTcpConnection();
|
||||||
|
TcpBuffer& getSendBuffer() {return m_writeBuffer;}
|
||||||
private:
|
bool getResPackageData(const std::string& msg_req, std::shared_ptr<AbstractData>& pb_struct);
|
||||||
void process() override;
|
void process() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
namespace tinyrpc {
|
namespace tinyrpc {
|
||||||
class TcpServer;
|
class TcpServer;
|
||||||
class ServerTcpConnection : AbstractTcpConnection {
|
class ServerTcpConnection : public AbstractTcpConnection {
|
||||||
|
|
||||||
public:
|
public:
|
||||||
ServerTcpConnection(int fd, Reactor& reactor, TcpServer& ser);
|
ServerTcpConnection(int fd, Reactor& reactor, TcpServer& ser);
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
#include "logger.hpp"
|
||||||
#include <cstddef>
|
#include <cstddef>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
@ -14,7 +15,13 @@ namespace tinyrpc {
|
|||||||
~TcpBuffer() {
|
~TcpBuffer() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
void reserved(std::size_t spaceSize) { // 预留空间
|
||||||
|
if(getWriteable() <= spaceSize) {
|
||||||
|
|
||||||
|
resize((getReadable() + spaceSize) * 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
void dilatation() {
|
void dilatation() {
|
||||||
resize(m_buffer.size() * 2);
|
resize(m_buffer.size() * 2);
|
||||||
}
|
}
|
||||||
|
@ -2,21 +2,31 @@
|
|||||||
|
|
||||||
#include "abstract_coder.hpp"
|
#include "abstract_coder.hpp"
|
||||||
#include "client_tcp_connect.hpp"
|
#include "client_tcp_connect.hpp"
|
||||||
|
#include "coroutine.hpp"
|
||||||
#include "net_address.hpp"
|
#include "net_address.hpp"
|
||||||
#include "reactor.hpp"
|
#include "reactor.hpp"
|
||||||
|
#include "tinypb_data.hpp"
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
namespace tinyrpc {
|
namespace tinyrpc {
|
||||||
class TcpClient {
|
class TcpClient {
|
||||||
public:
|
public:
|
||||||
TcpClient(const NetAddress& peerAddr);
|
TcpClient(const NetAddress& peerAddr);
|
||||||
AbstractCoder& getCoder() {return *m_coder;}
|
AbstractCoder& getCoder() {return *m_coder;}
|
||||||
|
const NetAddress& getLocalAddr() const {return m_local_addr;}
|
||||||
|
const NetAddress& getPeerAddr() const {return m_peer_addr;}
|
||||||
|
bool writeToSendBuffer(const AbstractData& data);
|
||||||
|
int sendAndRecvData(const std::string& msg_req, std::shared_ptr<AbstractData>& res);
|
||||||
|
void addCoroutine(Coroutine& cor);
|
||||||
|
bool connectToServer();
|
||||||
|
void start();
|
||||||
~TcpClient();
|
~TcpClient();
|
||||||
private:
|
private:
|
||||||
int m_fd{-1};
|
int m_fd{-1};
|
||||||
NetAddress m_local_addr{};
|
NetAddress m_local_addr{};
|
||||||
NetAddress m_peer_addr{};
|
NetAddress m_peer_addr{};
|
||||||
Reactor& m_reactor;
|
Reactor& m_reactor;
|
||||||
ClientTcpConnection *m_connection;
|
std::unique_ptr<ClientTcpConnection> m_connection;
|
||||||
AbstractCoder* m_coder{};
|
AbstractCoder* m_coder{};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
23
includes/net/tinypb/tinypb_channel.hpp
Normal file
23
includes/net/tinypb/tinypb_channel.hpp
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "net_address.hpp"
|
||||||
|
#include "tcp_client.hpp"
|
||||||
|
#include <google/protobuf/service.h>
|
||||||
|
|
||||||
|
namespace tinyrpc {
|
||||||
|
|
||||||
|
class TinypbChannel : public google::protobuf::RpcChannel {
|
||||||
|
public:
|
||||||
|
TinypbChannel(const NetAddress& peerAddr);
|
||||||
|
~TinypbChannel() = default;
|
||||||
|
void CallMethod(const google::protobuf::MethodDescriptor* method,
|
||||||
|
google::protobuf::RpcController* controller,
|
||||||
|
const google::protobuf::Message* request,
|
||||||
|
google::protobuf::Message* response,
|
||||||
|
google::protobuf::Closure* done) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
TcpClient m_client;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
@ -7,7 +7,7 @@ namespace tinyrpc {
|
|||||||
public:
|
public:
|
||||||
TinypbCoder();
|
TinypbCoder();
|
||||||
~TinypbCoder();
|
~TinypbCoder();
|
||||||
bool encoder(TcpBuffer& buffer, AbstractData& data) override; // 编码
|
bool encoder(TcpBuffer& buffer, const AbstractData& data) override; // 编码
|
||||||
bool decoder(TcpBuffer& buffer, AbstractData& data) override; // 解码
|
bool decoder(TcpBuffer& buffer, AbstractData& data) override; // 解码
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
@ -1,13 +1,57 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
|
||||||
#include "abstract_coder.hpp"
|
#include "abstract_coder.hpp"
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <sys/stat.h>
|
||||||
|
#include <fcntl.h>
|
||||||
|
|
||||||
namespace tinyrpc {
|
namespace tinyrpc {
|
||||||
|
static thread_local std::string t_msg_req_nu;
|
||||||
|
static thread_local std::string t_max_msg_req_nu;
|
||||||
|
static int g_random_fd = -1;
|
||||||
struct TinypbData : public AbstractData {
|
struct TinypbData : public AbstractData {
|
||||||
|
static std::string genMsgNumber()
|
||||||
|
{
|
||||||
|
int t_msg_req_len = 8;
|
||||||
|
|
||||||
|
if (t_msg_req_nu.empty() || t_msg_req_nu == t_max_msg_req_nu) {
|
||||||
|
if (g_random_fd == -1) {
|
||||||
|
g_random_fd = open("/dev/urandom", O_RDONLY);
|
||||||
|
}
|
||||||
|
std::string res(t_msg_req_len, 0);
|
||||||
|
|
||||||
|
if ((read(g_random_fd, &res[0], t_msg_req_len)) != t_msg_req_len) {
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
t_max_msg_req_nu = "";
|
||||||
|
|
||||||
|
for (int i = 0; i < t_msg_req_len; ++i) {
|
||||||
|
uint8_t x = ((uint8_t)(res[i])) % 10;
|
||||||
|
res[i] = x + '0';
|
||||||
|
t_max_msg_req_nu += "9";
|
||||||
|
}
|
||||||
|
|
||||||
|
t_msg_req_nu = res;
|
||||||
|
|
||||||
|
|
||||||
|
} else {
|
||||||
|
int i = t_msg_req_nu.length() - 1;
|
||||||
|
while (t_msg_req_nu[i] == '9' && i >= 0) {
|
||||||
|
i--;
|
||||||
|
}
|
||||||
|
if (i >= 0) {
|
||||||
|
t_msg_req_nu[i] += 1;
|
||||||
|
for (size_t j = i + 1; j < t_msg_req_nu.length(); ++j) {
|
||||||
|
t_msg_req_nu[j] = '0';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return t_msg_req_nu;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
TinypbData() {};
|
TinypbData() {};
|
||||||
~TinypbData() {};
|
~TinypbData() {};
|
||||||
@ -26,6 +70,4 @@ namespace tinyrpc {
|
|||||||
// char end = 0x03; // identify end of a TinyPb protocal data
|
// char end = 0x03; // identify end of a TinyPb protocal data
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
@ -13,16 +13,18 @@ namespace tinyrpc {
|
|||||||
m_reactor(reactor)
|
m_reactor(reactor)
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
void AbstractTcpConnection::addMainTaskToReactor() {
|
||||||
Reactor::Task task = [this] {
|
Reactor::Task task = [this] {
|
||||||
logger() << "conn coroutine is resume";
|
logger() << "conn coroutine is resume";
|
||||||
m_mainCoroutine.resume();
|
m_mainCoroutine.resume();
|
||||||
};
|
};
|
||||||
|
|
||||||
reactor.addTask(task, true);
|
m_reactor.addTask(task, true);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void AbstractTcpConnection::mainLoopFun() {
|
void AbstractTcpConnection::mainLoopFun() {
|
||||||
while(m_state == State::Connected) {
|
while(m_state == State::Connected) {
|
||||||
input();
|
input();
|
||||||
|
@ -35,4 +35,12 @@ namespace tinyrpc {
|
|||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ClientTcpConnection::getResPackageData(const std::string& msg_req, std::shared_ptr<AbstractData>& pb_struct) {
|
||||||
|
auto it = m_respond_datas.find(msg_req);
|
||||||
|
if(it == m_respond_datas.end()) return false;
|
||||||
|
pb_struct = it->second;
|
||||||
|
m_respond_datas.erase(it);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
@ -25,9 +25,8 @@ namespace tinyrpc {
|
|||||||
|
|
||||||
|
|
||||||
void IOThread::addClient(TcpServer* ser, int fd) {
|
void IOThread::addClient(TcpServer* ser, int fd) {
|
||||||
|
|
||||||
m_clients[fd] = std::shared_ptr<ServerTcpConnection>(new ServerTcpConnection(fd, *m_reactor, *ser));
|
m_clients[fd] = std::shared_ptr<ServerTcpConnection>(new ServerTcpConnection(fd, *m_reactor, *ser));
|
||||||
|
m_clients[fd]->addMainTaskToReactor();
|
||||||
}
|
}
|
||||||
|
|
||||||
void IOThread::mainFunc() {
|
void IOThread::mainFunc() {
|
||||||
|
@ -22,7 +22,7 @@ namespace tinyrpc {
|
|||||||
bool ret = m_server.getCoder().decoder(m_readBuffer, *data);
|
bool ret = m_server.getCoder().decoder(m_readBuffer, *data);
|
||||||
|
|
||||||
if(ret == false) {
|
if(ret == false) {
|
||||||
logger() << "decode error";
|
logger() << "decode uncompleted";
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,5 +1,8 @@
|
|||||||
#include "tcp_client.hpp"
|
#include "tcp_client.hpp"
|
||||||
|
#include "abstract_tcp_connect.hpp"
|
||||||
#include "client_tcp_connect.hpp"
|
#include "client_tcp_connect.hpp"
|
||||||
|
#include "coroutine.hpp"
|
||||||
|
#include "error_code.hpp"
|
||||||
#include "fd_event.hpp"
|
#include "fd_event.hpp"
|
||||||
#include "logger.hpp"
|
#include "logger.hpp"
|
||||||
#include "net_address.hpp"
|
#include "net_address.hpp"
|
||||||
@ -11,6 +14,7 @@
|
|||||||
namespace tinyrpc {
|
namespace tinyrpc {
|
||||||
|
|
||||||
TcpClient::TcpClient(const NetAddress& peerAddr) :
|
TcpClient::TcpClient(const NetAddress& peerAddr) :
|
||||||
|
m_local_addr("127.0.0.1", 0),
|
||||||
m_peer_addr(peerAddr),
|
m_peer_addr(peerAddr),
|
||||||
m_reactor(*Reactor::getReactor())
|
m_reactor(*Reactor::getReactor())
|
||||||
|
|
||||||
@ -19,9 +23,9 @@ namespace tinyrpc {
|
|||||||
if (m_fd == -1) {
|
if (m_fd == -1) {
|
||||||
logger() << "call socket error, fd=-1, sys error=" << strerror(errno);
|
logger() << "call socket error, fd=-1, sys error=" << strerror(errno);
|
||||||
}
|
}
|
||||||
m_local_addr = NetAddress("127.0.0.1", 0);
|
|
||||||
m_coder = new TinypbCoder();
|
m_coder = new TinypbCoder();
|
||||||
m_connection = new ClientTcpConnection(m_fd, m_reactor, *this);
|
// m_connection = new ClientTcpConnection(m_fd, m_reactor, *this);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -29,7 +33,56 @@ namespace tinyrpc {
|
|||||||
m_reactor.delFdEvent(FdEventPool::getInstance()->getFdEvent(m_fd));
|
m_reactor.delFdEvent(FdEventPool::getInstance()->getFdEvent(m_fd));
|
||||||
if(m_fd != -1) close(m_fd);
|
if(m_fd != -1) close(m_fd);
|
||||||
delete m_coder;
|
delete m_coder;
|
||||||
delete m_connection;
|
}
|
||||||
|
bool TcpClient::connectToServer() {
|
||||||
|
if(m_connection.get() == nullptr || m_connection->getState() == ClientTcpConnection::State::Disconnected) {
|
||||||
|
int ret = connect(m_fd, m_peer_addr.getSockaddr(), m_peer_addr.getSockLen());
|
||||||
|
if(ret == -1) return false;
|
||||||
|
m_connection.reset(new ClientTcpConnection(m_fd, m_reactor, *this));
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool TcpClient::writeToSendBuffer(const AbstractData& data) {
|
||||||
|
connectToServer();
|
||||||
|
return m_coder->encoder(m_connection->getSendBuffer(), data);
|
||||||
|
}
|
||||||
|
|
||||||
|
int TcpClient::sendAndRecvData(const std::string& msg_req, std::shared_ptr<AbstractData>& res) {
|
||||||
|
|
||||||
|
if(!connectToServer()) {
|
||||||
|
logger() << "error1";
|
||||||
|
return ERROR_FAILED_CONNECT;
|
||||||
|
}
|
||||||
|
m_connection->output();
|
||||||
|
|
||||||
|
if(m_connection->getState() == ClientTcpConnection::State::Disconnected) {
|
||||||
|
logger() << "error1";
|
||||||
|
return ERROR_FAILED_GET_REPLY;
|
||||||
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
m_connection->input();
|
||||||
|
m_connection->process();
|
||||||
|
if(m_connection->getState() != ClientTcpConnection::State::Connected) {
|
||||||
|
logger() << "error1";
|
||||||
|
return ERROR_FAILED_GET_REPLY;
|
||||||
|
}
|
||||||
|
}while(!m_connection->getResPackageData(msg_req, res));
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void TcpClient::addCoroutine(Coroutine& cor) {
|
||||||
|
Reactor::Task task = [&cor] {
|
||||||
|
cor.resume();
|
||||||
|
};
|
||||||
|
|
||||||
|
m_reactor.addTask(task);
|
||||||
|
}
|
||||||
|
void TcpClient::start() {
|
||||||
|
Coroutine::getMainCoroutine();
|
||||||
|
m_reactor.loop();
|
||||||
|
}
|
||||||
}
|
}
|
77
src/net/tinypb/tinypb_channel.cc
Normal file
77
src/net/tinypb/tinypb_channel.cc
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
#include "tinypb_channel.hpp"
|
||||||
|
#include "error_code.hpp"
|
||||||
|
#include "logger.hpp"
|
||||||
|
#include "tinypb_controller.hpp"
|
||||||
|
#include "tinypb_data.hpp"
|
||||||
|
#include <google/protobuf/message.h>
|
||||||
|
#include <memory>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
namespace tinyrpc {
|
||||||
|
TinypbChannel::TinypbChannel(const NetAddress& peerAddr) :
|
||||||
|
m_client(peerAddr)
|
||||||
|
{
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
void TinypbChannel::CallMethod(const google::protobuf::MethodDescriptor* method,
|
||||||
|
google::protobuf::RpcController* controller,
|
||||||
|
const google::protobuf::Message* request,
|
||||||
|
google::protobuf::Message* response,
|
||||||
|
google::protobuf::Closure* done)
|
||||||
|
{
|
||||||
|
|
||||||
|
TinypbController* rpc_controller = dynamic_cast<TinypbController*>(controller);
|
||||||
|
rpc_controller->SetLocalAddr(m_client.getLocalAddr());
|
||||||
|
rpc_controller->SetPeerAddr(m_client.getPeerAddr());
|
||||||
|
|
||||||
|
std::unique_ptr<TinypbData> data(new TinypbData);
|
||||||
|
|
||||||
|
data->service_full_name = method->full_name();
|
||||||
|
|
||||||
|
if (!request->SerializeToString(&(data->pb_data))) {
|
||||||
|
logger() << "serialize send package error";
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
data->msg_req = TinypbData::genMsgNumber();
|
||||||
|
data->msg_req_len = data->msg_req.length();
|
||||||
|
rpc_controller->SetMsgReq(data->msg_req);
|
||||||
|
|
||||||
|
m_client.writeToSendBuffer(*data);
|
||||||
|
|
||||||
|
std::shared_ptr<AbstractData> res;
|
||||||
|
|
||||||
|
int ret = m_client.sendAndRecvData(data->msg_req, res);
|
||||||
|
|
||||||
|
if(ret != 0) {
|
||||||
|
rpc_controller->SetError(ret, "sendAndRecvData err");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::shared_ptr<TinypbData> res_data = std::dynamic_pointer_cast<TinypbData>(res);
|
||||||
|
// CONTINUE
|
||||||
|
if(!response->ParseFromString(res_data->pb_data)) {
|
||||||
|
rpc_controller->SetError(ERROR_FAILED_DESERIALIZE, "failed to deserialize data from server");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(res_data->err_code != 0) {
|
||||||
|
rpc_controller->SetError(res_data->err_code, res_data->err_info);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
logger() << "============================================================";
|
||||||
|
logger() << data->msg_req << "|" << rpc_controller->PeerAddr().toString()
|
||||||
|
<< "|call rpc server [" << data->service_full_name << "] succ"
|
||||||
|
<< ". Get server reply response data:" << response->ShortDebugString();
|
||||||
|
logger() << "============================================================";
|
||||||
|
|
||||||
|
if(done) done->Run();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
@ -1,12 +1,11 @@
|
|||||||
#include "tinypb_coder.hpp"
|
#include "tinypb_coder.hpp"
|
||||||
#include "abstract_coder.hpp"
|
#include "abstract_coder.hpp"
|
||||||
|
#include "logger.hpp"
|
||||||
#include "tinypb_data.hpp"
|
#include "tinypb_data.hpp"
|
||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#include <new>
|
|
||||||
#include <vector>
|
|
||||||
|
|
||||||
|
|
||||||
namespace tinyrpc {
|
namespace tinyrpc {
|
||||||
@ -16,13 +15,16 @@ namespace tinyrpc {
|
|||||||
// static const int MSG_REQ_LEN = 20; // default length of msg_req
|
// static const int MSG_REQ_LEN = 20; // default length of msg_req
|
||||||
|
|
||||||
TinypbCoder::TinypbCoder() {
|
TinypbCoder::TinypbCoder() {
|
||||||
// TODO
|
|
||||||
}
|
}
|
||||||
TinypbCoder::~TinypbCoder() {
|
TinypbCoder::~TinypbCoder() {
|
||||||
// TODO
|
|
||||||
}
|
}
|
||||||
bool TinypbCoder::encoder(TcpBuffer& buffer, AbstractData& data) {
|
bool TinypbCoder::encoder(TcpBuffer& buffer, const AbstractData& data) {
|
||||||
TinypbData& pbdata = dynamic_cast<TinypbData&>(data);
|
logger() << "encoder";
|
||||||
|
const TinypbData& pbdata = dynamic_cast<const TinypbData&>(data);
|
||||||
|
//TinypbData->encode_succ = false; TODO?
|
||||||
|
|
||||||
if(pbdata.msg_req.empty()) return false;
|
if(pbdata.msg_req.empty()) return false;
|
||||||
if(pbdata.service_full_name.empty()) return false;
|
if(pbdata.service_full_name.empty()) return false;
|
||||||
if(pbdata.pb_data.empty()) return false;
|
if(pbdata.pb_data.empty()) return false;
|
||||||
@ -52,7 +54,7 @@ namespace tinyrpc {
|
|||||||
int32_t service_name_len_net = htonl(service_name_len);
|
int32_t service_name_len_net = htonl(service_name_len);
|
||||||
memcpy(&buf[cur_index], &service_name_len_net, sizeof(service_name_len_net));
|
memcpy(&buf[cur_index], &service_name_len_net, sizeof(service_name_len_net));
|
||||||
cur_index += sizeof(service_name_len_net);
|
cur_index += sizeof(service_name_len_net);
|
||||||
memcpy(&buf[cur_index], pbdata.msg_req.c_str(), service_name_len);
|
memcpy(&buf[cur_index], pbdata.service_full_name.c_str(), service_name_len);
|
||||||
cur_index += service_name_len;
|
cur_index += service_name_len;
|
||||||
|
|
||||||
int32_t err_code = pbdata.err_code;
|
int32_t err_code = pbdata.err_code;
|
||||||
@ -67,25 +69,28 @@ namespace tinyrpc {
|
|||||||
memcpy(&buf[cur_index], pbdata.err_info.c_str(), err_info_len);
|
memcpy(&buf[cur_index], pbdata.err_info.c_str(), err_info_len);
|
||||||
cur_index += err_info_len;
|
cur_index += err_info_len;
|
||||||
|
|
||||||
int32_t pb_data_len = pbdata.err_info.length();
|
int32_t pb_data_len = pbdata.pb_data.length();
|
||||||
memcpy(&buf[cur_index], pbdata.pb_data.c_str(), pb_data_len);
|
memcpy(&buf[cur_index], pbdata.pb_data.c_str(), pb_data_len);
|
||||||
cur_index += pb_data_len;
|
cur_index += pb_data_len;
|
||||||
|
|
||||||
int32_t check_num = 1;
|
int32_t check_num = 1; // checksum has not been implemented yet, directly skip chcksum
|
||||||
int32_t check_num_net = htonl(check_num);
|
int32_t check_num_net = htonl(check_num);
|
||||||
memcpy(&buf[cur_index], &check_num_net, sizeof(check_num_net));
|
memcpy(&buf[cur_index], &check_num_net, sizeof(check_num_net));
|
||||||
cur_index += sizeof(check_num_net);
|
cur_index += sizeof(check_num_net);
|
||||||
|
|
||||||
buf[cur_index++] = PB_END;
|
buf[cur_index++] = PB_END;
|
||||||
|
|
||||||
memcpy(buffer.getWriteAddress(), buf.get(), pk_len);
|
buffer.reserved(pk_len);
|
||||||
|
|
||||||
|
memcpy(buffer.getWriteAddress(), buf.get(), pk_len); // 预留
|
||||||
buffer.writeOffset(pk_len);
|
buffer.writeOffset(pk_len);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool TinypbCoder::decoder(TcpBuffer& buffer, AbstractData& data) {
|
bool TinypbCoder::decoder(TcpBuffer& buffer, AbstractData& data) {
|
||||||
|
logger() << "decoder";
|
||||||
char* buff = static_cast<char*>(buffer.getReadAddress());
|
char* buff = static_cast<char*>(buffer.getReadAddress());
|
||||||
|
|
||||||
int start_index = -1;
|
int start_index = -1;
|
||||||
@ -94,30 +99,33 @@ namespace tinyrpc {
|
|||||||
int pack_len = -1;
|
int pack_len = -1;
|
||||||
|
|
||||||
for(int i = 0; i < static_cast<int>(buffer.getReadable()); i++) {
|
for(int i = 0; i < static_cast<int>(buffer.getReadable()); i++) {
|
||||||
if(buff[i] == PB_START) {
|
if(buff[i] != PB_START) continue;
|
||||||
|
|
||||||
|
|
||||||
if(i + 1 >= static_cast<int>(buffer.getReadable())) {
|
if(i + 1 >= static_cast<int>(buffer.getReadable())) {
|
||||||
return false; // 包不完整
|
return false; // 包不完整
|
||||||
}
|
}
|
||||||
|
|
||||||
pack_len = getInt32FromNetByte(buff[i + 1]);
|
pack_len = getInt32FromNetByte(buff[i + 1]);
|
||||||
end_index = pack_len + i - 1;
|
|
||||||
|
|
||||||
|
end_index = pack_len + i - 1;
|
||||||
if(end_index >= static_cast<int>(buffer.getReadable())) {
|
if(end_index >= static_cast<int>(buffer.getReadable())) {
|
||||||
continue;
|
continue; // 不符合格式
|
||||||
}
|
}
|
||||||
|
|
||||||
if(buff[end_index] == PB_END) {
|
|
||||||
|
if(buff[end_index] != PB_END) {
|
||||||
|
|
||||||
|
continue; // 不符合格式
|
||||||
|
}
|
||||||
isFullPack = true;
|
isFullPack = true;
|
||||||
start_index = i;
|
start_index = i;
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if(isFullPack == false) {
|
if(isFullPack == false) {
|
||||||
|
|
||||||
return false; // 包不完整
|
return false; // 包不完整
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -207,7 +215,7 @@ namespace tinyrpc {
|
|||||||
pbdata.pb_data = std::string(&buff[cur_index], pb_data_len);
|
pbdata.pb_data = std::string(&buff[cur_index], pb_data_len);
|
||||||
cur_index += pb_data_len;
|
cur_index += pb_data_len;
|
||||||
|
|
||||||
buffer.readOffset(cur_index - start_index);
|
buffer.readOffset(end_index + 1);
|
||||||
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -72,7 +72,7 @@ namespace tinyrpc {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<Message> respondMsg (service->GetRequestPrototype(method).New());
|
std::unique_ptr<Message> respondMsg (service->GetResponsePrototype(method).New());
|
||||||
|
|
||||||
auto callback = [&respond, &respondMsg] {
|
auto callback = [&respond, &respondMsg] {
|
||||||
if(!respondMsg->SerializePartialToString(&respond.pb_data)) {
|
if(!respondMsg->SerializePartialToString(&respond.pb_data)) {
|
||||||
|
66
test/clienttest/clienttest.cc
Normal file
66
test/clienttest/clienttest.cc
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
#include "coroutine.hpp"
|
||||||
|
#include "logger.hpp"
|
||||||
|
#include "net_address.hpp"
|
||||||
|
#include "reactor.hpp"
|
||||||
|
#include "tcp_client.hpp"
|
||||||
|
#include "tcp_server.hpp"
|
||||||
|
#include "tinypb.pb.h"
|
||||||
|
#include "tinypb_channel.hpp"
|
||||||
|
#include "tinypb_closure.hpp"
|
||||||
|
#include "tinypb_controller.hpp"
|
||||||
|
#include <iostream>
|
||||||
|
using namespace std;
|
||||||
|
using namespace tinyrpc;
|
||||||
|
|
||||||
|
int n = 10;
|
||||||
|
|
||||||
|
void test()
|
||||||
|
{
|
||||||
|
NetAddress addr("127.0.0.1", 9001);
|
||||||
|
TinypbChannel channel(addr);
|
||||||
|
while (n--) {
|
||||||
|
logger() << "============== test no:" << n << "===============";
|
||||||
|
|
||||||
|
queryNameReq req_name;
|
||||||
|
req_name.set_req_no(20220315);
|
||||||
|
req_name.set_id(1100110001);
|
||||||
|
req_name.set_type(1);
|
||||||
|
queryNameRes res_name;
|
||||||
|
|
||||||
|
queryAgeReq req_age;
|
||||||
|
req_age.set_req_no(00001111);
|
||||||
|
req_age.set_id(6781);
|
||||||
|
queryAgeRes res_age;
|
||||||
|
|
||||||
|
TinypbClosure cb([]() {
|
||||||
|
logger() << "==========================";
|
||||||
|
logger() << "succ call rpc";
|
||||||
|
logger() << "==========================";
|
||||||
|
});
|
||||||
|
|
||||||
|
QueryService_Stub stub(&channel);
|
||||||
|
TinypbController rpc_controller;
|
||||||
|
|
||||||
|
stub.query_name(&rpc_controller, &req_name, &res_name, &cb);
|
||||||
|
|
||||||
|
if (rpc_controller.ErrorCode() != 0) {
|
||||||
|
logger() << "call rpc method query_name failed, errcode=" << rpc_controller.ErrorCode() << ",error=" << rpc_controller.ErrorText();
|
||||||
|
}
|
||||||
|
if (res_name.ret_code() != 0) {
|
||||||
|
logger() << "query name error, errcode=" << res_name.ret_code() << ", res_info=" << res_name.res_info();
|
||||||
|
} else {
|
||||||
|
logger() << "get res_name.age = " << res_name.name();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int main()
|
||||||
|
{
|
||||||
|
// TcpServer server;
|
||||||
|
TcpClient client(NetAddress("127.0.0.1", 9001));
|
||||||
|
Coroutine cor(test);
|
||||||
|
client.addCoroutine(cor);
|
||||||
|
client.start();
|
||||||
|
return 0;
|
||||||
|
}
|
38
test/clienttest/tinypb.proto
Normal file
38
test/clienttest/tinypb.proto
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
syntax = "proto3";
|
||||||
|
option cc_generic_services = true;
|
||||||
|
|
||||||
|
message queryAgeReq {
|
||||||
|
int32 req_no = 1;
|
||||||
|
int32 id = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message queryAgeRes {
|
||||||
|
int32 ret_code = 1;
|
||||||
|
string res_info = 2;
|
||||||
|
int32 req_no = 3;
|
||||||
|
int32 id = 4;
|
||||||
|
int32 age = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
message queryNameReq {
|
||||||
|
int32 req_no = 1;
|
||||||
|
int32 id = 2;
|
||||||
|
int32 type = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message queryNameRes {
|
||||||
|
int32 ret_code = 1;
|
||||||
|
string res_info = 2;
|
||||||
|
int32 req_no = 3;
|
||||||
|
int32 id = 4;
|
||||||
|
string name = 5;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
service QueryService {
|
||||||
|
// rpc method name
|
||||||
|
rpc query_name(queryNameReq) returns (queryNameRes);
|
||||||
|
|
||||||
|
// rpc method name
|
||||||
|
rpc query_age(queryAgeReq) returns (queryAgeRes);
|
||||||
|
}
|
27
test/codertest/main.cc
Normal file
27
test/codertest/main.cc
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
#include <iostream>
|
||||||
|
#include "tinypb_coder.hpp"
|
||||||
|
#include "tinypb_data.hpp"
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
using namespace tinyrpc;
|
||||||
|
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
TinypbData data;
|
||||||
|
data.msg_req = "11231231312";
|
||||||
|
data.pb_data = "121231233456";
|
||||||
|
data.service_full_name = "aaa.b12313bb";
|
||||||
|
TinypbCoder coder;
|
||||||
|
TcpBuffer buffer;
|
||||||
|
TinypbData newdata;
|
||||||
|
coder.encoder(buffer, data);
|
||||||
|
coder.decoder(buffer, newdata);
|
||||||
|
|
||||||
|
|
||||||
|
cout << newdata.msg_req << endl;
|
||||||
|
cout << newdata.pb_data << endl;
|
||||||
|
cout << newdata.service_full_name << endl;
|
||||||
|
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Binary file not shown.
@ -1,22 +0,0 @@
|
|||||||
#include <iostream>
|
|
||||||
#include "reactor.hpp"
|
|
||||||
#include "tcp_server.hpp"
|
|
||||||
using namespace std;
|
|
||||||
using namespace tinyrpc;
|
|
||||||
void test() {
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int main() {
|
|
||||||
TcpServer server;
|
|
||||||
|
|
||||||
// server.
|
|
||||||
Reactor::getReactor()->addTask(test);
|
|
||||||
server.start();
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
@ -16,12 +16,17 @@ public:
|
|||||||
::queryNameRes* response,
|
::queryNameRes* response,
|
||||||
::google::protobuf::Closure* done) override
|
::google::protobuf::Closure* done) override
|
||||||
{
|
{
|
||||||
|
|
||||||
logger() << "QueryServiceImpl.query_name, req={" << request->ShortDebugString() << "}";
|
logger() << "QueryServiceImpl.query_name, req={" << request->ShortDebugString() << "}";
|
||||||
|
logger() << response;
|
||||||
response->set_ret_code(0);
|
response->set_ret_code(0);
|
||||||
|
|
||||||
response->set_res_info("OK");
|
response->set_res_info("OK");
|
||||||
|
|
||||||
response->set_req_no(request->req_no());
|
response->set_req_no(request->req_no());
|
||||||
response->set_id(request->id());
|
response->set_id(request->id());
|
||||||
response->set_name("yyy");
|
response->set_name("yyy");
|
||||||
|
|
||||||
done->Run();
|
done->Run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user