Skip to content
16 changes: 13 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ OPT ?= -O2 # (A) Production use (optimized mode)
include depends.mk

LIB=libsofa-pbrpc.a
LIB_FULL=libsofa-pbrpc-full.a
LIB_SRC=$(wildcard src/sofa/pbrpc/*.cc)
PLUGIN_SRC=$(wildcard src/sofa/pbrpc/plugin/*/*.cc)
LIB_OBJ=$(patsubst %.cc,%.o,$(LIB_SRC))
PLUGIN_OBJ=$(patsubst %.cc,%.o,$(PLUGIN_SRC))
PROTO=$(wildcard src/sofa/pbrpc/*.proto)
PROTO_SRC=$(patsubst %.proto,%.pb.cc,$(PROTO))
PROTO_HEADER=$(patsubst %.proto,%.pb.h,$(PROTO))
Expand All @@ -45,6 +48,7 @@ PUB_INC=src/sofa/pbrpc/pbrpc.h src/sofa/pbrpc/closure_helper.h src/sofa/pbrpc/cl
src/sofa/pbrpc/fast_lock.h src/sofa/pbrpc/rw_lock.h src/sofa/pbrpc/scoped_locker.h \
src/sofa/pbrpc/condition_variable.h src/sofa/pbrpc/wait_event.h src/sofa/pbrpc/http.h \
src/sofa/pbrpc/buffer.h src/sofa/pbrpc/buf_handle.h src/sofa/pbrpc/profiling_linker.h \
src/sofa/pbrpc/rpc_attachment.h \
$(PROTO) $(PROTO_HEADER)

#-----------------------------------------------
Expand Down Expand Up @@ -84,7 +88,7 @@ check_depends:
@if [ ! -f "$(SNAPPY_DIR)/lib/libsnappy.a" ]; then echo "ERROR: need snappy lib"; exit 1; fi

clean:
rm -f $(LIB) $(BIN) $(LIB_OBJ) $(PROTO_OBJ) $(BIN_OBJ) $(PROTO_HEADER) $(PROTO_SRC)
rm -f $(LIB) $(LIB_FULL) $(BIN) $(LIB_OBJ) $(PLUGIN_OBJ) $(PROTO_OBJ) $(BIN_OBJ) $(PROTO_HEADER) $(PROTO_SRC)

rebuild: clean all

Expand All @@ -95,6 +99,9 @@ $(LIB_OBJ): $(PROTO_HEADER)
$(LIB): $(LIB_OBJ) $(PROTO_OBJ)
ar crs $@ $(LIB_OBJ) $(PROTO_OBJ)

$(LIB_FULL): $(PLUGIN_OBJ) $(LIB_OBJ) $(PROTO_OBJ)
ar crs $@ $(PLUGIN_OBJ) $(LIB_OBJ) $(PROTO_OBJ)

$(BIN): $(LIB) $(BIN_OBJ)
$(CXX) $(BIN_OBJ) -o $@ $(LIB) $(LDFLAGS)

Expand All @@ -104,19 +111,22 @@ $(BIN): $(LIB) $(BIN_OBJ)
%.o: %.cc
$(CXX) $(CXXFLAGS) -c $< -o $@

build: $(LIB) $(BIN)
build: $(LIB) $(LIB_FULL) $(BIN)
@echo
@echo 'Build succeed, run "make install" to install sofa-pbrpc to "'$(PREFIX)'".'

install: $(LIB) $(BIN)
install: $(LIB) $(LIB_FULL) $(BIN)
mkdir -p $(PREFIX)/include/sofa/pbrpc
cp -r $(PUB_INC) $(TARGET_DIRECTORY) $(PREFIX)/include/sofa/pbrpc/
mkdir -p $(PREFIX)/include/sofa/pbrpc/smart_ptr
cp src/sofa/pbrpc/smart_ptr/*.hpp $(PREFIX)/include/sofa/pbrpc/smart_ptr
mkdir -p $(PREFIX)/include/sofa/pbrpc/smart_ptr/detail
cp src/sofa/pbrpc/smart_ptr/detail/*.hpp $(PREFIX)/include/sofa/pbrpc/smart_ptr/detail
mkdir -p $(PREFIX)/include/sofa/pbrpc/plugin/cookie
cp src/sofa/pbrpc/plugin/cookie/*.h $(PREFIX)/include/sofa/pbrpc/plugin/cookie
mkdir -p $(PREFIX)/lib
cp $(LIB) $(PREFIX)/lib/
cp $(LIB_FULL) $(PREFIX)/lib/
mkdir -p $(PREFIX)/bin
cp $(BIN) $(PREFIX)/bin/
@echo
Expand Down
4 changes: 2 additions & 2 deletions sample/echo/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ INCPATH=-I. -I$(SOFA_PBRPC)/include -I$(PROTOBUF_DIR)/include \
-I$(SNAPPY_DIR)/include -I$(ZLIB_DIR)/include
CXXFLAGS += $(OPT) -pipe -W -Wall -fPIC -D_GNU_SOURCE -D__STDC_LIMIT_MACROS $(INCPATH)

LIBRARY=$(SOFA_PBRPC)/lib/libsofa-pbrpc.a $(PROTOBUF_DIR)/lib/libprotobuf.a $(SNAPPY_DIR)/lib/libsnappy.a
LIBRARY=$(SOFA_PBRPC)/lib/libsofa-pbrpc-full.a $(PROTOBUF_DIR)/lib/libprotobuf.a $(SNAPPY_DIR)/lib/libsnappy.a
LDFLAGS += -L$(ZLIB_DIR)/lib -lpthread -lz

UNAME_S := $(shell uname -s)
Expand All @@ -57,7 +57,7 @@ check_depends:
@if [ ! -f "$(SNAPPY_DIR)/include/snappy.h" ]; then echo "ERROR: need snappy header"; exit 1; fi
@if [ ! -f "$(SNAPPY_DIR)/lib/libsnappy.a" ]; then echo "ERROR: need snappy lib"; exit 1; fi
@if [ ! -f "$(SOFA_PBRPC)/include/sofa/pbrpc/pbrpc.h" ]; then echo "ERROR: need sofa-pbrpc header"; exit 1; fi
@if [ ! -f "$(SOFA_PBRPC)/lib/libsofa-pbrpc.a" ]; then echo "ERROR: need sofa-pbrpc lib"; exit 1; fi
@if [ ! -f "$(SOFA_PBRPC)/lib/libsofa-pbrpc-full.a" ]; then echo "ERROR: need sofa-pbrpc-full lib"; exit 1; fi

clean:
@rm -f $(BIN) *.o *.pb.*
Expand Down
21 changes: 20 additions & 1 deletion sample/echo/client_async.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@

#include <unistd.h>
#include <sofa/pbrpc/pbrpc.h>
#include <sofa/pbrpc/plugin/cookie/rpc_cookie.h>
#include "echo_service.pb.h"

typedef sofa::pbrpc::shared_ptr<sofa::pbrpc::RpcCookie> RpcCookiePtr;
sofa::pbrpc::RpcCookieManager g_cookie_manager;

void EchoCallback(sofa::pbrpc::RpcController* cntl,
sofa::pbrpc::test::EchoRequest* request,
sofa::pbrpc::test::EchoResponse* response,
Expand All @@ -24,8 +28,17 @@ void EchoCallback(sofa::pbrpc::RpcController* cntl,
if (cntl->Failed()) {
SLOG(ERROR, "request failed: %s", cntl->ErrorText().c_str());
}
else {
else
{
SLOG(NOTICE, "request succeed: %s", response->message().c_str());
RpcCookiePtr cookie(new sofa::pbrpc::RpcCookie(&g_cookie_manager));
if (cntl->GetResponseAttachment(cookie.get()))
{
std::string version;
cookie->Get("version", version);
SLOG(NOTICE, "cookie version=%s", version.c_str());
cookie->Store();
}
}

delete cntl;
Expand All @@ -46,9 +59,15 @@ int main()
sofa::pbrpc::RpcChannelOptions channel_options;
sofa::pbrpc::RpcChannel rpc_channel(&rpc_client, "127.0.0.1:12321", channel_options);

RpcCookiePtr cookie(new sofa::pbrpc::RpcCookie(&g_cookie_manager));
cookie->Load();
cookie->Set("type", "async");
cookie->Set("logid", "123456");

// Prepare parameters.
sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController();
cntl->SetTimeout(3000);
cntl->SetRequestAttachment(cookie.get());
sofa::pbrpc::test::EchoRequest* request = new sofa::pbrpc::test::EchoRequest();
request->set_message("Hello from qinzuoyan01");
sofa::pbrpc::test::EchoResponse* response = new sofa::pbrpc::test::EchoResponse();
Expand Down
17 changes: 17 additions & 0 deletions sample/echo/client_sync.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
// Author: qinzuoyan01@baidu.com (Qin Zuoyan)

#include <sofa/pbrpc/pbrpc.h>
#include <sofa/pbrpc/plugin/cookie/rpc_cookie.h>
#include "echo_service.pb.h"

typedef sofa::pbrpc::shared_ptr<sofa::pbrpc::RpcCookie> RpcCookiePtr;
sofa::pbrpc::RpcCookieManager g_cookie_manager;

// Using global RpcClient object can help share resources such as threads and buffers.
sofa::pbrpc::RpcClient g_rpc_client;

Expand All @@ -21,6 +25,11 @@ int main()
// Prepare parameters.
sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController();
cntl->SetTimeout(3000);
RpcCookiePtr cookie(new sofa::pbrpc::RpcCookie(&g_cookie_manager));
cookie->Load();
cookie->Set("type", "sync");
cookie->Set("logid", "123456");
cntl->SetRequestAttachment(cookie.get());
sofa::pbrpc::test::EchoRequest* request =
new sofa::pbrpc::test::EchoRequest();
request->set_message("Hello from qinzuoyan01");
Expand Down Expand Up @@ -50,6 +59,14 @@ int main()
else
{
SLOG(NOTICE, "request succeed: %s", response->message().c_str());
cookie.reset(new sofa::pbrpc::RpcCookie(&g_cookie_manager));
if (cntl->GetResponseAttachment(cookie.get()))
{
std::string version;
cookie->Get("version", version);
SLOG(NOTICE, "cookie version=%s", version.c_str());
cookie->Store();
}
}

// Destroy objects.
Expand Down
14 changes: 14 additions & 0 deletions sample/echo/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
#include <signal.h>
#include <unistd.h>
#include <sofa/pbrpc/pbrpc.h>
#include <sofa/pbrpc/plugin/cookie/rpc_cookie.h>
#include "echo_service.pb.h"

typedef sofa::pbrpc::shared_ptr<sofa::pbrpc::RpcCookie> RpcCookiePtr;

bool WebServlet(const sofa::pbrpc::HTTPRequest& request, sofa::pbrpc::HTTPResponse& response)
{
SLOG(INFO, "WebServlet(): request message from %s:%u",
Expand Down Expand Up @@ -53,6 +56,17 @@ class EchoServerImpl : public sofa::pbrpc::test::EchoServer
SLOG(INFO, "Header[\"%s\"]=\"%s\"", it->first.c_str(), it->second.c_str());
}
}
RpcCookiePtr cookie(new sofa::pbrpc::RpcCookie());
if (cntl->GetRequestAttachment(cookie.get()))
{
std::string type;
std::string logid;
cookie->Get("type", type);
cookie->Get("logid", logid);
SLOG(INFO, "cookie info : type=%s, logid=%s", type.c_str(), logid.c_str());
}
cookie->Set("version", "1.00");
cntl->SetResponseAttachment(cookie.get());
response->set_message("echo message: " + request->message());
done->Run();
}
Expand Down
48 changes: 45 additions & 3 deletions src/sofa/pbrpc/binary_rpc_request.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,48 @@ void BinaryRpcRequest::ProcessRequest(
bool parse_request_return = false;
if (compress_type == CompressTypeNone)
{
parse_request_return = request->ParseFromZeroCopyStream(_req_body.get());
parse_request_return =
request->ParseFromBoundedZeroCopyStream(_req_body.get(), _req_header.data_size);
}
else
{
ReadBufferPtr read_buffer(new ReadBuffer());
int bytes_read = 0;
while (bytes_read < _req_header.data_size)
{
const char* read_pos = NULL;
int cur_size;
int bytes_remain = _req_header.data_size - bytes_read;
int handle_offset = _req_body->CurrentHandleOffset();
if (!_req_body->Next(reinterpret_cast<const void**>(&read_pos), &cur_size))
{
#if defined( LOG )
LOG(ERROR) << "ProcessRequest(): " << RpcEndpointToString(_remote_endpoint)
<< ": {" << _req_meta.sequence_id() << "}"
<< ": bad request buffer";
#else
SLOG(ERROR, "ProcessRequest(): %s: {%lu}: bad request buffer",
RpcEndpointToString(_remote_endpoint).c_str(),
_req_meta.sequence_id());
#endif
SendFailedResponse(stream, RPC_ERROR_PARSE_REQUEST_MESSAGE, "bad request buffer");
return;
}
char* handle_data = const_cast<char*>(read_pos) - handle_offset;
if (bytes_remain >= cur_size)
{
read_buffer->Append(BufHandle(handle_data, cur_size, handle_offset));
bytes_read += cur_size;
}
else
{
_req_body->BackUp(cur_size - bytes_remain);
read_buffer->Append(BufHandle(handle_data, bytes_remain, handle_offset));
bytes_read += bytes_remain;
}
}
sofa::pbrpc::scoped_ptr<AbstractCompressedInputStream> is(
get_compressed_input_stream(_req_body.get(), compress_type));
get_compressed_input_stream(read_buffer.get(), compress_type));
parse_request_return = request->ParseFromZeroCopyStream(is.get());
}
if (!parse_request_return)
Expand Down Expand Up @@ -123,6 +159,9 @@ void BinaryRpcRequest::ProcessRequest(
cntl->SetResponseCompressType(_req_meta.has_expected_response_compress_type() ?
_req_meta.expected_response_compress_type() : CompressTypeNone);

cntl->SetRequestSize(_req_header.data_size);
cntl->SetRequestAttachBuffer(_req_body);

CallMethod(method_board, controller, request, response);
}

Expand Down Expand Up @@ -170,7 +209,10 @@ ReadBufferPtr BinaryRpcRequest::AssembleSucceedResponse(
return ReadBufferPtr();
}
header.data_size = write_buffer.ByteCount() - header_pos - header_size - header.meta_size;
header.message_size = header.meta_size + header.data_size;
write_buffer.Append(cntl->GetResponseAttachBuffer()->ToString());
int attach_size = write_buffer.ByteCount() - header_pos - header_size - header.meta_size - header.data_size;
header.message_size = header.meta_size + header.data_size + attach_size;

write_buffer.SetData(header_pos, reinterpret_cast<const char*>(&header), header_size);

ReadBufferPtr read_buffer(new ReadBuffer());
Expand Down
5 changes: 5 additions & 0 deletions src/sofa/pbrpc/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ bool ReadBuffer::Next(const void** data, int* size)
}
}

int ReadBuffer::CurrentHandleOffset()
{
return _cur_it->offset + _cur_pos;
}

// BackUp() can only be called after a successful Next().
// "count" should be greater than or equal to 0.
void ReadBuffer::BackUp(int count)
Expand Down
4 changes: 4 additions & 0 deletions src/sofa/pbrpc/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ class ReadBuffer : public google::protobuf::io::ZeroCopyInputStream

// implements ZeroCopyInputStream ----------------------------------
bool Next(const void** data, int* size);

// Get the offset of current buffer handle.
int CurrentHandleOffset();

void BackUp(int count);
bool Skip(int count);
int64 ByteCount() const;
Expand Down
Loading