diff --git a/Makefile b/Makefile index 402f1c0..7763a4d 100644 --- a/Makefile +++ b/Makefile @@ -24,8 +24,11 @@ OPT ?= -O2 # (A) Production use (optimized mode) include depends.mk LIB=libsofa-pbrpc.a +LIB_FULL=libsofa-pbrpc-full.a LIB_SRC=$(wildcard src/sofa/pbrpc/*.cc) +PLUGIN_SRC=$(wildcard src/sofa/pbrpc/plugin/*/*.cc) LIB_OBJ=$(patsubst %.cc,%.o,$(LIB_SRC)) +PLUGIN_OBJ=$(patsubst %.cc,%.o,$(PLUGIN_SRC)) PROTO=$(wildcard src/sofa/pbrpc/*.proto) PROTO_SRC=$(patsubst %.proto,%.pb.cc,$(PROTO)) PROTO_HEADER=$(patsubst %.proto,%.pb.h,$(PROTO)) @@ -45,6 +48,7 @@ PUB_INC=src/sofa/pbrpc/pbrpc.h src/sofa/pbrpc/closure_helper.h src/sofa/pbrpc/cl src/sofa/pbrpc/fast_lock.h src/sofa/pbrpc/rw_lock.h src/sofa/pbrpc/scoped_locker.h \ src/sofa/pbrpc/condition_variable.h src/sofa/pbrpc/wait_event.h src/sofa/pbrpc/http.h \ src/sofa/pbrpc/buffer.h src/sofa/pbrpc/buf_handle.h src/sofa/pbrpc/profiling_linker.h \ + src/sofa/pbrpc/rpc_attachment.h \ $(PROTO) $(PROTO_HEADER) #----------------------------------------------- @@ -84,7 +88,7 @@ check_depends: @if [ ! -f "$(SNAPPY_DIR)/lib/libsnappy.a" ]; then echo "ERROR: need snappy lib"; exit 1; fi clean: - rm -f $(LIB) $(BIN) $(LIB_OBJ) $(PROTO_OBJ) $(BIN_OBJ) $(PROTO_HEADER) $(PROTO_SRC) + rm -f $(LIB) $(LIB_FULL) $(BIN) $(LIB_OBJ) $(PLUGIN_OBJ) $(PROTO_OBJ) $(BIN_OBJ) $(PROTO_HEADER) $(PROTO_SRC) rebuild: clean all @@ -95,6 +99,9 @@ $(LIB_OBJ): $(PROTO_HEADER) $(LIB): $(LIB_OBJ) $(PROTO_OBJ) ar crs $@ $(LIB_OBJ) $(PROTO_OBJ) +$(LIB_FULL): $(PLUGIN_OBJ) $(LIB_OBJ) $(PROTO_OBJ) + ar crs $@ $(PLUGIN_OBJ) $(LIB_OBJ) $(PROTO_OBJ) + $(BIN): $(LIB) $(BIN_OBJ) $(CXX) $(BIN_OBJ) -o $@ $(LIB) $(LDFLAGS) @@ -104,19 +111,22 @@ $(BIN): $(LIB) $(BIN_OBJ) %.o: %.cc $(CXX) $(CXXFLAGS) -c $< -o $@ -build: $(LIB) $(BIN) +build: $(LIB) $(LIB_FULL) $(BIN) @echo @echo 'Build succeed, run "make install" to install sofa-pbrpc to "'$(PREFIX)'".' -install: $(LIB) $(BIN) +install: $(LIB) $(LIB_FULL) $(BIN) mkdir -p $(PREFIX)/include/sofa/pbrpc cp -r $(PUB_INC) $(TARGET_DIRECTORY) $(PREFIX)/include/sofa/pbrpc/ mkdir -p $(PREFIX)/include/sofa/pbrpc/smart_ptr cp src/sofa/pbrpc/smart_ptr/*.hpp $(PREFIX)/include/sofa/pbrpc/smart_ptr mkdir -p $(PREFIX)/include/sofa/pbrpc/smart_ptr/detail cp src/sofa/pbrpc/smart_ptr/detail/*.hpp $(PREFIX)/include/sofa/pbrpc/smart_ptr/detail + mkdir -p $(PREFIX)/include/sofa/pbrpc/plugin/cookie + cp src/sofa/pbrpc/plugin/cookie/*.h $(PREFIX)/include/sofa/pbrpc/plugin/cookie mkdir -p $(PREFIX)/lib cp $(LIB) $(PREFIX)/lib/ + cp $(LIB_FULL) $(PREFIX)/lib/ mkdir -p $(PREFIX)/bin cp $(BIN) $(PREFIX)/bin/ @echo diff --git a/sample/echo/Makefile b/sample/echo/Makefile index 9b12585..1a6db23 100644 --- a/sample/echo/Makefile +++ b/sample/echo/Makefile @@ -32,7 +32,7 @@ INCPATH=-I. -I$(SOFA_PBRPC)/include -I$(PROTOBUF_DIR)/include \ -I$(SNAPPY_DIR)/include -I$(ZLIB_DIR)/include CXXFLAGS += $(OPT) -pipe -W -Wall -fPIC -D_GNU_SOURCE -D__STDC_LIMIT_MACROS $(INCPATH) -LIBRARY=$(SOFA_PBRPC)/lib/libsofa-pbrpc.a $(PROTOBUF_DIR)/lib/libprotobuf.a $(SNAPPY_DIR)/lib/libsnappy.a +LIBRARY=$(SOFA_PBRPC)/lib/libsofa-pbrpc-full.a $(PROTOBUF_DIR)/lib/libprotobuf.a $(SNAPPY_DIR)/lib/libsnappy.a LDFLAGS += -L$(ZLIB_DIR)/lib -lpthread -lz UNAME_S := $(shell uname -s) @@ -57,7 +57,7 @@ check_depends: @if [ ! -f "$(SNAPPY_DIR)/include/snappy.h" ]; then echo "ERROR: need snappy header"; exit 1; fi @if [ ! -f "$(SNAPPY_DIR)/lib/libsnappy.a" ]; then echo "ERROR: need snappy lib"; exit 1; fi @if [ ! -f "$(SOFA_PBRPC)/include/sofa/pbrpc/pbrpc.h" ]; then echo "ERROR: need sofa-pbrpc header"; exit 1; fi - @if [ ! -f "$(SOFA_PBRPC)/lib/libsofa-pbrpc.a" ]; then echo "ERROR: need sofa-pbrpc lib"; exit 1; fi + @if [ ! -f "$(SOFA_PBRPC)/lib/libsofa-pbrpc-full.a" ]; then echo "ERROR: need sofa-pbrpc-full lib"; exit 1; fi clean: @rm -f $(BIN) *.o *.pb.* diff --git a/sample/echo/client_async.cc b/sample/echo/client_async.cc index 3055c09..16d0523 100644 --- a/sample/echo/client_async.cc +++ b/sample/echo/client_async.cc @@ -6,8 +6,12 @@ #include #include +#include #include "echo_service.pb.h" +typedef sofa::pbrpc::shared_ptr RpcCookiePtr; +sofa::pbrpc::RpcCookieManager g_cookie_manager; + void EchoCallback(sofa::pbrpc::RpcController* cntl, sofa::pbrpc::test::EchoRequest* request, sofa::pbrpc::test::EchoResponse* response, @@ -24,8 +28,17 @@ void EchoCallback(sofa::pbrpc::RpcController* cntl, if (cntl->Failed()) { SLOG(ERROR, "request failed: %s", cntl->ErrorText().c_str()); } - else { + else + { SLOG(NOTICE, "request succeed: %s", response->message().c_str()); + RpcCookiePtr cookie(new sofa::pbrpc::RpcCookie(&g_cookie_manager)); + if (cntl->GetResponseAttachment(cookie.get())) + { + std::string version; + cookie->Get("version", version); + SLOG(NOTICE, "cookie version=%s", version.c_str()); + cookie->Store(); + } } delete cntl; @@ -46,9 +59,15 @@ int main() sofa::pbrpc::RpcChannelOptions channel_options; sofa::pbrpc::RpcChannel rpc_channel(&rpc_client, "127.0.0.1:12321", channel_options); + RpcCookiePtr cookie(new sofa::pbrpc::RpcCookie(&g_cookie_manager)); + cookie->Load(); + cookie->Set("type", "async"); + cookie->Set("logid", "123456"); + // Prepare parameters. sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController(); cntl->SetTimeout(3000); + cntl->SetRequestAttachment(cookie.get()); sofa::pbrpc::test::EchoRequest* request = new sofa::pbrpc::test::EchoRequest(); request->set_message("Hello from qinzuoyan01"); sofa::pbrpc::test::EchoResponse* response = new sofa::pbrpc::test::EchoResponse(); diff --git a/sample/echo/client_sync.cc b/sample/echo/client_sync.cc index 7b26ff5..193ccec 100644 --- a/sample/echo/client_sync.cc +++ b/sample/echo/client_sync.cc @@ -5,8 +5,12 @@ // Author: qinzuoyan01@baidu.com (Qin Zuoyan) #include +#include #include "echo_service.pb.h" +typedef sofa::pbrpc::shared_ptr RpcCookiePtr; +sofa::pbrpc::RpcCookieManager g_cookie_manager; + // Using global RpcClient object can help share resources such as threads and buffers. sofa::pbrpc::RpcClient g_rpc_client; @@ -21,6 +25,11 @@ int main() // Prepare parameters. sofa::pbrpc::RpcController* cntl = new sofa::pbrpc::RpcController(); cntl->SetTimeout(3000); + RpcCookiePtr cookie(new sofa::pbrpc::RpcCookie(&g_cookie_manager)); + cookie->Load(); + cookie->Set("type", "sync"); + cookie->Set("logid", "123456"); + cntl->SetRequestAttachment(cookie.get()); sofa::pbrpc::test::EchoRequest* request = new sofa::pbrpc::test::EchoRequest(); request->set_message("Hello from qinzuoyan01"); @@ -50,6 +59,14 @@ int main() else { SLOG(NOTICE, "request succeed: %s", response->message().c_str()); + cookie.reset(new sofa::pbrpc::RpcCookie(&g_cookie_manager)); + if (cntl->GetResponseAttachment(cookie.get())) + { + std::string version; + cookie->Get("version", version); + SLOG(NOTICE, "cookie version=%s", version.c_str()); + cookie->Store(); + } } // Destroy objects. diff --git a/sample/echo/server.cc b/sample/echo/server.cc index c38015e..b7c7010 100644 --- a/sample/echo/server.cc +++ b/sample/echo/server.cc @@ -7,8 +7,11 @@ #include #include #include +#include #include "echo_service.pb.h" +typedef sofa::pbrpc::shared_ptr RpcCookiePtr; + bool WebServlet(const sofa::pbrpc::HTTPRequest& request, sofa::pbrpc::HTTPResponse& response) { SLOG(INFO, "WebServlet(): request message from %s:%u", @@ -53,6 +56,17 @@ class EchoServerImpl : public sofa::pbrpc::test::EchoServer SLOG(INFO, "Header[\"%s\"]=\"%s\"", it->first.c_str(), it->second.c_str()); } } + RpcCookiePtr cookie(new sofa::pbrpc::RpcCookie()); + if (cntl->GetRequestAttachment(cookie.get())) + { + std::string type; + std::string logid; + cookie->Get("type", type); + cookie->Get("logid", logid); + SLOG(INFO, "cookie info : type=%s, logid=%s", type.c_str(), logid.c_str()); + } + cookie->Set("version", "1.00"); + cntl->SetResponseAttachment(cookie.get()); response->set_message("echo message: " + request->message()); done->Run(); } diff --git a/src/sofa/pbrpc/binary_rpc_request.cc b/src/sofa/pbrpc/binary_rpc_request.cc index 83a6ea9..04fd16e 100644 --- a/src/sofa/pbrpc/binary_rpc_request.cc +++ b/src/sofa/pbrpc/binary_rpc_request.cc @@ -83,12 +83,48 @@ void BinaryRpcRequest::ProcessRequest( bool parse_request_return = false; if (compress_type == CompressTypeNone) { - parse_request_return = request->ParseFromZeroCopyStream(_req_body.get()); + parse_request_return = + request->ParseFromBoundedZeroCopyStream(_req_body.get(), _req_header.data_size); } else { + ReadBufferPtr read_buffer(new ReadBuffer()); + int bytes_read = 0; + while (bytes_read < _req_header.data_size) + { + const char* read_pos = NULL; + int cur_size; + int bytes_remain = _req_header.data_size - bytes_read; + int handle_offset = _req_body->CurrentHandleOffset(); + if (!_req_body->Next(reinterpret_cast(&read_pos), &cur_size)) + { +#if defined( LOG ) + LOG(ERROR) << "ProcessRequest(): " << RpcEndpointToString(_remote_endpoint) + << ": {" << _req_meta.sequence_id() << "}" + << ": bad request buffer"; +#else + SLOG(ERROR, "ProcessRequest(): %s: {%lu}: bad request buffer", + RpcEndpointToString(_remote_endpoint).c_str(), + _req_meta.sequence_id()); +#endif + SendFailedResponse(stream, RPC_ERROR_PARSE_REQUEST_MESSAGE, "bad request buffer"); + return; + } + char* handle_data = const_cast(read_pos) - handle_offset; + if (bytes_remain >= cur_size) + { + read_buffer->Append(BufHandle(handle_data, cur_size, handle_offset)); + bytes_read += cur_size; + } + else + { + _req_body->BackUp(cur_size - bytes_remain); + read_buffer->Append(BufHandle(handle_data, bytes_remain, handle_offset)); + bytes_read += bytes_remain; + } + } sofa::pbrpc::scoped_ptr is( - get_compressed_input_stream(_req_body.get(), compress_type)); + get_compressed_input_stream(read_buffer.get(), compress_type)); parse_request_return = request->ParseFromZeroCopyStream(is.get()); } if (!parse_request_return) @@ -123,6 +159,9 @@ void BinaryRpcRequest::ProcessRequest( cntl->SetResponseCompressType(_req_meta.has_expected_response_compress_type() ? _req_meta.expected_response_compress_type() : CompressTypeNone); + cntl->SetRequestSize(_req_header.data_size); + cntl->SetRequestAttachBuffer(_req_body); + CallMethod(method_board, controller, request, response); } @@ -170,11 +209,14 @@ ReadBufferPtr BinaryRpcRequest::AssembleSucceedResponse( return ReadBufferPtr(); } header.data_size = write_buffer.ByteCount() - header_pos - header_size - header.meta_size; - header.message_size = header.meta_size + header.data_size; + ReadBufferPtr response_attach_buffer = cntl->GetResponseAttachBuffer(); + header.message_size = header.meta_size + header.data_size + response_attach_buffer->TotalCount(); + write_buffer.SetData(header_pos, reinterpret_cast(&header), header_size); ReadBufferPtr read_buffer(new ReadBuffer()); write_buffer.SwapOut(read_buffer.get()); + read_buffer->Append(response_attach_buffer.get()); return read_buffer; } diff --git a/src/sofa/pbrpc/buffer.cc b/src/sofa/pbrpc/buffer.cc index 1a45435..bff002b 100644 --- a/src/sofa/pbrpc/buffer.cc +++ b/src/sofa/pbrpc/buffer.cc @@ -103,6 +103,11 @@ bool ReadBuffer::Next(const void** data, int* size) } } +int ReadBuffer::CurrentHandleOffset() +{ + return _cur_it->offset + _cur_pos; +} + // BackUp() can only be called after a successful Next(). // "count" should be greater than or equal to 0. void ReadBuffer::BackUp(int count) diff --git a/src/sofa/pbrpc/buffer.h b/src/sofa/pbrpc/buffer.h index 42ae569..164dc73 100644 --- a/src/sofa/pbrpc/buffer.h +++ b/src/sofa/pbrpc/buffer.h @@ -58,6 +58,10 @@ class ReadBuffer : public google::protobuf::io::ZeroCopyInputStream // implements ZeroCopyInputStream ---------------------------------- bool Next(const void** data, int* size); + + // Get the offset of current buffer handle. + int CurrentHandleOffset(); + void BackUp(int count); bool Skip(int count); int64 ByteCount() const; diff --git a/src/sofa/pbrpc/plugin/cookie/rpc_cookie.cc b/src/sofa/pbrpc/plugin/cookie/rpc_cookie.cc new file mode 100644 index 0000000..1718d70 --- /dev/null +++ b/src/sofa/pbrpc/plugin/cookie/rpc_cookie.cc @@ -0,0 +1,212 @@ +// Copyright (c) 2014 Baidu.com, Inc. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Author: zhangdi05@baidu.com (Zhang Di) + +#include +#include +#include + +namespace sofa { +namespace pbrpc { + +bool RpcCookieManager::Has(const std::string& key) +{ + ScopedLocker _(_lock); + return _kv_map.find(key) != _kv_map.end(); +} + +void RpcCookieManager::Load(std::map& kv_map) +{ + ScopedLocker _(_lock); + kv_map = _kv_map; +} + +void RpcCookieManager::Store(const std::map& kv_map) +{ + ScopedLocker _(_lock); + std::set remove_keys; + std::map add_or_modify_pairs; + std::map::const_iterator old_it = _kv_map.begin(); + std::map::const_iterator new_it = kv_map.begin(); + while (old_it != _kv_map.end() && new_it != kv_map.end()) + { + if(old_it->first < new_it->first) + { + remove_keys.insert(old_it->first); + ++old_it; + } + else if (old_it->first > new_it->first) + { + add_or_modify_pairs.insert(*new_it); + ++new_it; + } + else + { + if (old_it->second != new_it->second) + { + add_or_modify_pairs.insert(*new_it); + } + ++old_it; + ++new_it; + } + } + while (old_it != _kv_map.end()) + { + remove_keys.insert(old_it->first); + ++old_it; + } + if (new_it != kv_map.end()) + { + add_or_modify_pairs.insert(new_it, kv_map.end()); + } + for (std::set::const_iterator it = remove_keys.begin(); + it != remove_keys.end(); ++it) + { + _kv_map.erase(*it); + } + for (std::map::const_iterator it = add_or_modify_pairs.begin(); + it != add_or_modify_pairs.end(); ++it) + { + _kv_map[it->first] = it->second; + } +} + +RpcCookie::RpcCookie() : _manager(NULL) +{ } + +RpcCookie::RpcCookie(RpcCookieManager* manager) : _manager(manager) +{ } + +RpcCookie::~RpcCookie() +{ } + +bool RpcCookie::Get(const std::string& key, std::string& value) +{ + std::map::const_iterator it = _kv_map.find(key); + if (it == _kv_map.end()) + { + return false; + } + value = it->second; + return true; +} + +void RpcCookie::Set(const std::string& key, const std::string& value) +{ + _kv_map[key] = value; +} + +void RpcCookie::Load() +{ + if (_manager != NULL) + { + _manager->Load(_kv_map); + } +} + +void RpcCookie::Store() +{ + if (_manager != NULL) + { + _manager->Store(_kv_map); + } +} + +bool RpcCookie::Has(const std::string& key) +{ + return _kv_map.find(key) != _kv_map.end(); +} + +bool RpcCookie::Empty() +{ + return _kv_map.empty(); +} + +bool RpcCookie::Erase(const std::string& key) +{ + if (_kv_map.find(key) == _kv_map.end()) + { + return false; + } + _kv_map.erase(key); + return true; +} + +void RpcCookie::Clear() +{ + _kv_map.clear(); +} + +bool RpcCookie::Serialize(sofa::pbrpc::ReadBufferPtr& attach_buffer) +{ + int size = _kv_map.size(); + SCHECK_LE(static_cast(size), SOFA_CONTAINER_MAX_SERIALIZE_SIZE); + WriteBufferPtr write_buffer(new WriteBuffer()); + Serializer serializer(write_buffer); + if (!serializer.serialize_varint(size)) + { + return false; + } + std::map::const_iterator it = _kv_map.begin(); + for (; it != _kv_map.end(); ++it) + { + if (!serializer.serialize_string(it->first)) + { + return false; + } + if (!serializer.serialize_string(it->second)) + { + return false; + } + } + write_buffer->SwapOut(attach_buffer.get()); + return true; +} + +int RpcCookie::SerializeLen() const +{ + int len = Serializer::varint_len(_kv_map.size()); + std::map::const_iterator it = _kv_map.begin(); + for (; it != _kv_map.end(); ++it) + { + len += Serializer::string_len(it->first); + len += Serializer::string_len(it->second); + } + return len; +} + +bool RpcCookie::Deserialize(sofa::pbrpc::ReadBufferPtr& attach_buffer) +{ + int size = 0; + Deserializer deserializer(attach_buffer); + if (!deserializer.deserialize_varint(size)) + { + return false; + } + if (static_cast(size) > SOFA_CONTAINER_MAX_SERIALIZE_SIZE) + { + return false; + } + _kv_map.clear(); + for (int i = 0; i < size; ++i) + { + std::string key; + if (!deserializer.deserialize_string(key)) + { + return false; + } + std::string value; + if (!deserializer.deserialize_string(value)) + { + return false; + } + _kv_map[key] = value; + } + return true; +} +} // namespace pbrpc +} // namespace sofa + +/* vim: set ts=4 sw=4 sts=4 tw=100 */ diff --git a/src/sofa/pbrpc/plugin/cookie/rpc_cookie.h b/src/sofa/pbrpc/plugin/cookie/rpc_cookie.h new file mode 100644 index 0000000..375bf8c --- /dev/null +++ b/src/sofa/pbrpc/plugin/cookie/rpc_cookie.h @@ -0,0 +1,84 @@ +// Copyright (c) 2014 Baidu.com, Inc. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Author: zhangdi05@baidu.com (Zhang Di) + +#ifndef _SOFA_PBRPC_PLUGIN_COOKIE_RPC_COOKIE_H_ +#define _SOFA_PBRPC_PLUGIN_COOKIE_RPC_COOKIE_H_ + +#include +#include +#include +#include +#include + +namespace sofa { +namespace pbrpc { + +class RpcCookieManager +{ +public: + RpcCookieManager() + { } + + ~RpcCookieManager() + { } + + bool Has(const std::string& key); + + void Load(std::map& kv_map); + + void Store(const std::map& kv_map); + +private: + std::map _kv_map; + + MutexLock _lock; +}; // class RpcCookieManager + +class RpcCookie : public sofa::pbrpc::RpcAttachment +{ +public: + RpcCookie(); + + explicit RpcCookie(RpcCookieManager* manager); + + virtual ~RpcCookie(); + + bool Get(const std::string& key, std::string& value); + + void Set(const std::string& key, const std::string& value); + + bool Has(const std::string& key); + + bool Empty(); + + bool Erase(const std::string& key); + + void Clear(); + + void Load(); + + void Store(); + + virtual bool Serialize(sofa::pbrpc::ReadBufferPtr& attach_buffer); + + virtual bool Deserialize(sofa::pbrpc::ReadBufferPtr& attach_buffer); + + int SerializeLen() const; + +private: + std::map _kv_map; + + RpcCookieManager* _manager; + + SOFA_PBRPC_DISALLOW_EVIL_CONSTRUCTORS(RpcCookie); +}; // class RpcCookie + +} // namespace pbrpc +} // namespace sofa + +#endif // _SOFA_PBRPC_PLUGIN_COOKIE_RPC_COOKIE_H_ + +/* vim: set ts=4 sw=4 sts=4 tw=100 */ diff --git a/src/sofa/pbrpc/rpc_attachment.h b/src/sofa/pbrpc/rpc_attachment.h new file mode 100644 index 0000000..40ba10d --- /dev/null +++ b/src/sofa/pbrpc/rpc_attachment.h @@ -0,0 +1,34 @@ +// Copyright (c) 2014 Baidu.com, Inc. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Author: zhangdi05@baidu.com (Zhang Di) + +#ifndef _SOFA_PBRPC_RPC_ATTACHMENT_H_ +#define _SOFA_PBRPC_RPC_ATTACHMENT_H_ + +#include +#include + +namespace sofa { +namespace pbrpc { + +class RpcAttachment; +typedef sofa::pbrpc::shared_ptr RpcAttachmentPtr; + +class RpcAttachment +{ +public: + virtual ~RpcAttachment() {} + + virtual bool Serialize(ReadBufferPtr& attachment_buffer) = 0; + + virtual bool Deserialize(ReadBufferPtr& attachment_buffer) = 0; +}; // class RpcAttachment + +} // namespace pbrpc +} // namespace sofa + +#endif // _SOFA_PBRPC_RPC_ATTACHMENT_H_ + +/* vim: set ts=4 sw=4 sts=4 tw=100 */ diff --git a/src/sofa/pbrpc/rpc_client_impl.cc b/src/sofa/pbrpc/rpc_client_impl.cc index a1a9b91..b52e123 100644 --- a/src/sofa/pbrpc/rpc_client_impl.cc +++ b/src/sofa/pbrpc/rpc_client_impl.cc @@ -322,11 +322,14 @@ void RpcClientImpl::CallMethod(const google::protobuf::Message* request, return; } header.data_size = write_buffer.ByteCount() - header_pos - header_size - header.meta_size; - header.message_size = header.meta_size + header.data_size; + ReadBufferPtr request_attach_buffer = cntl->GetRequestAttachBuffer(); + header.message_size = header.meta_size + header.data_size + request_attach_buffer->TotalCount(); + write_buffer.SetData(header_pos, reinterpret_cast(&header), header_size); ReadBufferPtr read_buffer(new ReadBuffer()); write_buffer.SwapOut(read_buffer.get()); + read_buffer->Append(request_attach_buffer.get()); cntl->SetRequestBuffer(read_buffer); // push callback @@ -434,15 +437,49 @@ void RpcClientImpl::DoneCallback(google::protobuf::Message* response, SCHECK(cntl->ResponseBuffer()); ReadBufferPtr buffer = cntl->ResponseBuffer(); CompressType compress_type = cntl->ResponseCompressType(); + int data_size = cntl->ResponseSize(); bool parse_response_return = false; if (compress_type == CompressTypeNone) { - parse_response_return = response->ParseFromZeroCopyStream(buffer.get()); + parse_response_return = response->ParseFromBoundedZeroCopyStream(buffer.get(), data_size); } else { + ReadBufferPtr read_buffer(new ReadBuffer()); + int bytes_read = 0; + while (bytes_read < data_size) + { + const char* read_pos = NULL; + int cur_size; + int bytes_remain = data_size - bytes_read; + int handle_offset = buffer->CurrentHandleOffset(); + if (!buffer->Next(reinterpret_cast(&read_pos), &cur_size)) + { +#if defined ( LOG ) + LOG(ERROR) << "DoneCallback(): " << RpcEndpointToString(cntl->RemoteEndpoint()) + << ": bad response buffer"; +#else + SLOG(ERROR, "DoneCallback(): %s: bad response buffer", + RpcEndpointToString(cntl->RemoteEndpoint()).c_str()); +#endif + cntl->SetFailed(RPC_ERROR_PARSE_RESPONSE_MESSAGE, "bad response buffer"); + return; + } + char* handle_data = const_cast(read_pos) - handle_offset; + if (bytes_remain >= cur_size) + { + read_buffer->Append(BufHandle(handle_data, cur_size, handle_offset)); + bytes_read += cur_size; + } + else + { + buffer->BackUp(cur_size - bytes_remain); + read_buffer->Append(BufHandle(handle_data, bytes_remain, handle_offset)); + bytes_read += bytes_remain; + } + } ::sofa::pbrpc::scoped_ptr is( - get_compressed_input_stream(buffer.get(), compress_type)); + get_compressed_input_stream(read_buffer.get(), compress_type)); parse_response_return = response->ParseFromZeroCopyStream(is.get()); } if (!parse_response_return) @@ -455,7 +492,9 @@ void RpcClientImpl::DoneCallback(google::protobuf::Message* response, RpcEndpointToString(cntl->RemoteEndpoint()).c_str()); #endif cntl->SetFailed(RPC_ERROR_PARSE_RESPONSE_MESSAGE, "parse response message pb failed"); + return; } + cntl->SetResponseAttachBuffer(buffer); } } diff --git a/src/sofa/pbrpc/rpc_client_stream.h b/src/sofa/pbrpc/rpc_client_stream.h index 541fb39..94e23c1 100644 --- a/src/sofa/pbrpc/rpc_client_stream.h +++ b/src/sofa/pbrpc/rpc_client_stream.h @@ -158,7 +158,8 @@ class RpcClientStream : public RpcMessageStream virtual void on_received( const ReadBufferPtr& message, int meta_size, - int64 data_size) + int64 data_size, + int attach_size) { SOFA_PBRPC_FUNCTION_TRACE; @@ -280,8 +281,9 @@ class RpcClientStream : public RpcMessageStream } else // !meta.failed() { - SCHECK_EQ(data_size, message->TotalCount() - message->ByteCount()); + SCHECK_EQ(data_size + attach_size, message->TotalCount() - message->ByteCount()); cntl->SetResponseBuffer(message); + cntl->SetResponseSize(data_size); cntl->SetResponseCompressType(meta.has_compress_type() ? meta.compress_type() : CompressTypeNone); cntl->Done(RPC_SUCCESS, "succeed"); diff --git a/src/sofa/pbrpc/rpc_controller.cc b/src/sofa/pbrpc/rpc_controller.cc index 57cad69..8bfbfee 100644 --- a/src/sofa/pbrpc/rpc_controller.cc +++ b/src/sofa/pbrpc/rpc_controller.cc @@ -119,6 +119,26 @@ void RpcController::NotifyOnCancel(google::protobuf::Closure* callback) _impl->NotifyOnCancel(callback); } +bool RpcController::SetRequestAttachment(RpcAttachment* request_attachment) +{ + return _impl->SetRequestAttachment(request_attachment); +} + +bool RpcController::GetRequestAttachment(RpcAttachment* request_attachment) +{ + return _impl->GetRequestAttachment(request_attachment); +} + +bool RpcController::SetResponseAttachment(RpcAttachment* response_attachment) +{ + return _impl->SetResponseAttachment(response_attachment); +} + +bool RpcController::GetResponseAttachment(RpcAttachment* response_attachment) +{ + return _impl->GetResponseAttachment(response_attachment); +} + } // namespace pbrpc } // namespace sofa diff --git a/src/sofa/pbrpc/rpc_controller.h b/src/sofa/pbrpc/rpc_controller.h index 2ec3c14..63f35a5 100644 --- a/src/sofa/pbrpc/rpc_controller.h +++ b/src/sofa/pbrpc/rpc_controller.h @@ -11,6 +11,7 @@ #include #include +#include namespace sofa { namespace pbrpc { @@ -123,6 +124,14 @@ class RpcController : public google::protobuf::RpcController // // Not supported now. virtual void StartCancel(); + + // If true, indicates that the attachment data on the client side has been + // serialized successfully. + bool SetRequestAttachment(RpcAttachment* request_attachment); + + // If true, indicates that the attachment data from the server side has been + // deserialized successfully. + bool GetResponseAttachment(RpcAttachment* response_attachment); // -------- used only by server side --------- // These calls should be made from the server side only. Their results @@ -164,6 +173,14 @@ class RpcController : public google::protobuf::RpcController // NotifyOnCancel() must be called no more than once per request. virtual void NotifyOnCancel(google::protobuf::Closure* callback); + // If true, indicates that the attachment data from the client side has been + // deserialized successfully. + bool GetRequestAttachment(RpcAttachment* request_attachment); + + // If true, indicates that the attachment data on the server side has been + // serialized successfully. + bool SetResponseAttachment(RpcAttachment* response_attachment); + public: const sofa::pbrpc::shared_ptr& impl() const { diff --git a/src/sofa/pbrpc/rpc_controller_impl.h b/src/sofa/pbrpc/rpc_controller_impl.h index 79dabb2..2d93f9c 100644 --- a/src/sofa/pbrpc/rpc_controller_impl.h +++ b/src/sofa/pbrpc/rpc_controller_impl.h @@ -19,6 +19,7 @@ #include #include #include +#include namespace sofa { namespace pbrpc { @@ -45,6 +46,8 @@ class RpcControllerImpl : public sofa::pbrpc::enable_shared_from_thisSerialize(_request_attach_buffer); + } + + bool GetRequestAttachment(RpcAttachment* request_attachment) + { + SCHECK(request_attachment); + return request_attachment->Deserialize(_request_attach_buffer); + } + + bool SetResponseAttachment(RpcAttachment* response_attachment) + { + SCHECK(response_attachment); + return response_attachment->Serialize(_response_attach_buffer); + } + + bool GetResponseAttachment(RpcAttachment* response_attachment) + { + SCHECK(response_attachment); + return response_attachment->Deserialize(_response_attach_buffer);; + } + + void SetRequestAttachBuffer(const ReadBufferPtr& request_attach_buffer) + { + _request_attach_buffer = request_attach_buffer; + } + + const ReadBufferPtr& GetRequestAttachBuffer() + { + return _request_attach_buffer; + } + + void SetResponseAttachBuffer(const ReadBufferPtr& response_attach_buffer) + { + _response_attach_buffer = response_attach_buffer; + } + + const ReadBufferPtr& GetResponseAttachBuffer() + { + return _response_attach_buffer; + } + void NotifyRequestSent(const RpcEndpoint& local_endpoint, int64 sent_bytes) { _is_request_sent = true; @@ -435,6 +482,26 @@ class RpcControllerImpl : public sofa::pbrpc::enable_shared_from_this* _http_query_params; const std::map* _http_headers; + ReadBufferPtr _request_attach_buffer; + ReadBufferPtr _response_attach_buffer; + + int _request_size; + int _response_size; + SOFA_PBRPC_DISALLOW_EVIL_CONSTRUCTORS(RpcControllerImpl); }; // class RpcControllerImpl diff --git a/src/sofa/pbrpc/rpc_message_stream.h b/src/sofa/pbrpc/rpc_message_stream.h index 4522991..b53137d 100644 --- a/src/sofa/pbrpc/rpc_message_stream.h +++ b/src/sofa/pbrpc/rpc_message_stream.h @@ -197,10 +197,12 @@ class RpcMessageStream : public RpcByteStream // @param message the rough received message, including meta and data. // @param meta_size the size of meta. // @param data_size the size of data. + // @param attch_buffer the attachment buffer. virtual void on_received( const ReadBufferPtr& message, int meta_size, - int64 data_size) = 0; + int64 data_size, + int attach_size) = 0; private: virtual bool on_connected() @@ -276,7 +278,7 @@ class RpcMessageStream : public RpcByteStream while (!is_closed() && !received_messages.empty()) { const ReceivedItem& item = received_messages.front(); - on_received(item.message, item.meta_size, item.data_size); + on_received(item.message, item.meta_size, item.data_size, item.attach_size); received_messages.pop_front(); } } @@ -604,8 +606,9 @@ class RpcMessageStream : public RpcByteStream { _receiving_message->Append(BufHandle(_tran_buf, consume_size, data - _tran_buf)); } + int attach_size = _receiving_header.message_size - _receiving_header.meta_size - _receiving_header.data_size; received_messages->push_back(ReceivedItem(_receiving_message, - _receiving_header.meta_size, _receiving_header.data_size)); + _receiving_header.meta_size, _receiving_header.data_size, attach_size)); reset_receiving_env(); data += consume_size; size -= consume_size; @@ -649,7 +652,7 @@ class RpcMessageStream : public RpcByteStream #endif return -1; } - if (_receiving_header.meta_size + _receiving_header.data_size != _receiving_header.message_size) + if (_receiving_header.meta_size + _receiving_header.data_size > _receiving_header.message_size) { #if defined( LOG ) LOG(ERROR) << "identify_message_header(): " << RpcEndpointToString(_remote_endpoint) @@ -736,12 +739,15 @@ class RpcMessageStream : public RpcByteStream ReadBufferPtr message; int meta_size; int64 data_size; + int attach_size; ReceivedItem(const ReadBufferPtr& _message, int _meta_size, - int64 _data_size) + int64 _data_size, + int _attach_size) : message(_message) , meta_size(_meta_size) - , data_size(_data_size) {} + , data_size(_data_size) + , attach_size(_attach_size) {} }; // TODO improve sync queue performance diff --git a/src/sofa/pbrpc/serialize.cc b/src/sofa/pbrpc/serialize.cc new file mode 100644 index 0000000..a0d6a0b --- /dev/null +++ b/src/sofa/pbrpc/serialize.cc @@ -0,0 +1,358 @@ +// Copyright (c) 2014 Baidu.com, Inc. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Author: qinzuoyan01@baidu.com (Qin Zuoyan) + +#include +#include + +namespace sofa { +namespace pbrpc { + +// NOTICE: When compiling by gcc with '-O2' option, the data may be out of order. +#define sofa_io_barrier() __asm__ __volatile__("": : :"memory") + +// fast copy small data block +static inline void fast_memcpy(void* dest, const void* src, size_t n) +{ + switch (n) + { + case 0: + return; + case 1: + *(uint8_t*)dest = *(const uint8_t*)src; + return; + case 2: + *(uint16_t*)dest = *(const uint16_t*)src; + return; + case 3: + *(byte_helper_t<3>*)dest = *(const byte_helper_t<3>*)src; + return; + case 4: + *(uint32_t*)dest = *(const uint32_t*)src; + return; + case 5: + *(byte_helper_t<5>*)dest = *(const byte_helper_t<5>*)src; + return; + case 6: + *(byte_helper_t<6>*)dest = *(const byte_helper_t<6>*)src; + return; + case 7: + *(byte_helper_t<7>*)dest = *(const byte_helper_t<7>*)src; + return; + case 8: + *(uint64_t*)dest = *(const uint64_t*)src; + return; + case 9: + *(byte_helper_t<9>*)dest = *(const byte_helper_t<9>*)src; + return; + case 10: + *(byte_helper_t<10>*)dest = *(const byte_helper_t<10>*)src; + return; + case 11: + *(byte_helper_t<11>*)dest = *(const byte_helper_t<11>*)src; + return; + case 12: + *(byte_helper_t<12>*)dest = *(const byte_helper_t<12>*)src; + return; + case 13: + *(byte_helper_t<13>*)dest = *(const byte_helper_t<13>*)src; + return; + case 14: + *(byte_helper_t<14>*)dest = *(const byte_helper_t<14>*)src; + return; + case 15: + *(byte_helper_t<15>*)dest = *(const byte_helper_t<15>*)src; + return; + case 16: + *(byte_helper_t<16>*)dest = *(const byte_helper_t<16>*)src; + return; + default: + memcpy(dest, src, n); + } +} + +bool Serializer::close() +{ + bool ret = true; + if (_buf_size > 0) + { + _stream->BackUp(_buf_size); + } + _buf = NULL; + _buf_size = 0; + return ret; +} + +bool Serializer::serialize_buffer(const void* data, int data_len) +{ + while (_buf_size < data_len) + { + fast_memcpy(_buf, data, _buf_size); + data = ((const char*)data) + _buf_size; + data_len -= _buf_size; + if (!_stream->Next(&_buf, &_buf_size)) + { + _buf_size = 0; + return false; + } + } + fast_memcpy(_buf, data, data_len); + _buf = ((char*)_buf) + data_len; + _buf_size -= data_len; + return true; +} + +bool Serializer::serialize_string(const std::string& d) +{ + if (d.size() > SOFA_STRING_MAX_SERIALIZE_SIZE) + { + SLOG(ERROR, "too big string size: %u", d.size()); + return false; + } + if (!serialize_varint(d.size())) + { + return false; + } + return serialize_buffer(d.c_str(), d.size()); +} + +// NOTICE: When compiling by gcc with '-O2' option, the data may be out of order. +// It occurs between modify 'd' and 'serialize_X_byte', so we use 'sofa_io_barrier()' to prevent it. +bool Serializer::serialize_varint(uint64_t d) +{ + if (d < (1UL << 7)) + { + d <<= 1; + sofa_io_barrier(); + return serialize_1_byte(&d); + } + if (d < (1UL << 14)) + { + d <<= 2; + d |= 0x1; + sofa_io_barrier(); + return serialize_2_byte(&d); + } + if (d < (1UL << 21)) + { + d <<= 3; + d |= 0x3; + sofa_io_barrier(); + return serialize_3_byte(&d); + } + if (d < (1UL << 28)) + { + d <<= 4; + d |= 0x7; + sofa_io_barrier(); + return serialize_4_byte(&d); + } + if (d < (1UL << 35)) + { + d <<= 5; + d |= 0xF; + sofa_io_barrier(); + return serialize_5_byte(&d); + } + if (d < (1UL << 42)) + { + d <<= 6; + d |= 0x1F; + sofa_io_barrier(); + return serialize_6_byte(&d); + } + if (d < (1UL << 49)) + { + d <<= 7; + d |= 0x3F; + sofa_io_barrier(); + return serialize_7_byte(&d); + } + if (d < (1UL << 56)) + { + d <<= 8; + d |= 0x7F; + sofa_io_barrier(); + return serialize_8_byte(&d); + } + return serialize_raw_data((uint8_t)0xFF) && serialize_raw_data(d); +} + +size_t Serializer::varint_len(uint64_t d) +{ + if (d < (1UL << 7)) + { + return 1; + } + if (d < (1UL << 14)) + { + return 2; + } + if (d < (1UL << 21)) + { + return 3; + } + if (d < (1UL << 28)) + { + return 4; + } + if (d < (1UL << 35)) + { + return 5; + } + if (d < (1UL << 42)) + { + return 6; + } + if (d < (1UL << 49)) + { + return 7; + } + if (d < (1UL << 56)) + { + return 8; + } + return 9; +} + +void Deserializer::close() +{ + if (_buf_size > 0) + { + _stream->BackUp(_buf_size); + } + _buf = NULL; + _buf_size = 0; +} + +bool Deserializer::deserialize_buffer(void* data, size_t data_len) +{ + while (static_cast(_buf_size) < data_len) + { + fast_memcpy(data, _buf, _buf_size); + data = ((char*)data) + _buf_size; + data_len -= _buf_size; + if (!_stream->Next(&_buf, &_buf_size)) + { + _buf_size = 0; + return false; + } + } + fast_memcpy(data, _buf, data_len); + _buf = ((const char*)_buf) + data_len; + _buf_size -= data_len; + return true; +} + +bool Deserializer::deserialize_string(std::string& d) +{ + size_t str_len = 0; + if (!deserialize_varint(str_len)) + { + return false; + } + if (str_len == 0) + { + d.clear(); + return true; + } + if (str_len > SOFA_STRING_MAX_SERIALIZE_SIZE) + { + SLOG(ERROR, "too big string size: %u", str_len); + return false; + } + StringUtils::resize_uninitialized(&d, str_len); + return deserialize_buffer(StringUtils::string_as_array(&d), str_len); +} + +bool Deserializer::deserialize_varint(uint64_t& d) +{ + d = 0; + if (!deserialize_raw_data(*(uint8_t*)&d)) + { + return false; + } + if ((d & 0x1) == 0) + { + d >>= 1; + return true; + } + if ((d & 0x2) == 0) + { + if (!deserialize_1_byte((uint8_t*)&d + 1)) + { + return false; + } + sofa_io_barrier(); + d >>= 2; + return true; + } + if ((d & 0x4) == 0) + { + if (!deserialize_2_byte((uint8_t*)&d + 1)) + { + return false; + } + sofa_io_barrier(); + d >>= 3; + return true; + } + if ((d & 0x8) == 0) + { + if (!deserialize_3_byte((uint8_t*)&d + 1)) + { + return false; + } + sofa_io_barrier(); + d >>= 4; + return true; + } + if ((d & 0x10) == 0) + { + if (!deserialize_4_byte((uint8_t*)&d + 1)) + { + return false; + } + sofa_io_barrier(); + d >>= 5; + return true; + } + if ((d & 0x20) == 0) + { + if (!deserialize_5_byte((uint8_t*)&d + 1)) + { + return false; + } + sofa_io_barrier(); + d >>= 6; + return true; + } + if ((d & 0x40) == 0) + { + if (!deserialize_6_byte((uint8_t*)&d + 1)) + { + return false; + } + sofa_io_barrier(); + d >>= 7; + return true; + } + if ((d & 0x80) == 0) + { + if (!deserialize_7_byte((uint8_t*)&d + 1)) + { + return false; + } + sofa_io_barrier(); + d >>= 8; + return true; + } + return deserialize_raw_data(d); +} + +} // namespace pbrpc +} // namespace sofa + +/* vim: set ts=4 sw=4 sts=4 tw=100 */ diff --git a/src/sofa/pbrpc/serialize.h b/src/sofa/pbrpc/serialize.h new file mode 100644 index 0000000..28ff9c9 --- /dev/null +++ b/src/sofa/pbrpc/serialize.h @@ -0,0 +1,346 @@ +// Copyright (c) 2014 Baidu.com, Inc. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. +// +// Author: qinzuoyan01@baidu.com (Qin Zuoyan) + +#ifndef _SOFA_PBRPC_SERIALIZE_H_ +#define _SOFA_PBRPC_SERIALIZE_H_ + +#include +#include +#include + +// String max serialize size: 64MB. +#define SOFA_STRING_MAX_SERIALIZE_SIZE 67108864u + +// Container max serialize size: 64MB. +#define SOFA_CONTAINER_MAX_SERIALIZE_SIZE 67108864u + +namespace sofa { +namespace pbrpc { + +// helper for fast copy small data block +template +struct byte_helper_t +{ + uint8_t data[N]; +}; + +// Serializer on output stream +class Serializer +{ +public: + explicit Serializer(WriteBufferPtr& stream) : _stream(stream) + { + _buf = NULL; + _buf_size = 0; + } + void set_stream(WriteBufferPtr& stream) + { + close(); + _stream = stream; + } + ~Serializer() + { } + // close serializer, backup un-used buffer + bool close(); + + template + bool serialize_raw_data(const T& d) + { + if (static_cast(_buf_size) >= sizeof(T)) + { + *(T*)_buf = d; + _buf = ((char*)_buf) + sizeof(T); + _buf_size -= sizeof(T); + return true; + } + return serialize_buffer(&d, sizeof(T)); + } + template + static size_t raw_data_len(const T&) + { + return sizeof(T); + } + template + static size_t max_raw_data_len(const T&) + { + return sizeof(T); + } + bool serialize_1_byte(const void __attribute__((may_alias)) * d) + { + return serialize_raw_data(*(const uint8_t __attribute__((may_alias))*)d); + } + bool serialize_2_byte(const void* d) + { + return serialize_raw_data(*(const uint16_t*)d); + } + bool serialize_3_byte(const void* d) + { + return serialize_raw_data(*(const byte_helper_t<3>*)d); + } + bool serialize_4_byte(const void* d) + { + return serialize_raw_data(*(const uint32_t*)d); + } + bool serialize_5_byte(const void* d) + { + return serialize_raw_data(*(const byte_helper_t<5>*)d); + } + bool serialize_6_byte(const void* d) + { + return serialize_raw_data(*(const byte_helper_t<6>*)d); + } + bool serialize_7_byte(const void* d) + { + return serialize_raw_data(*(const byte_helper_t<7>*)d); + } + bool serialize_8_byte(const void* d) + { + return serialize_raw_data(*(const uint64_t*)d); + } + bool serialize_buffer(const void* data, int data_len); + bool serialize_string(const std::string& d); + static size_t string_len(const std::string& d) + { + return varint_len(d.size()) + d.size(); + } + bool serialize_varint(uint8_t d) + { + return serialize_varint((uint64_t)d); + } + static size_t varint_len(uint8_t d) + { + return varint_len((uint64_t)d); + } + static size_t max_varint_len(uint8_t) + { + return max_varint_len((uint64_t)0); + } + bool serialize_varint(uint16_t d) + { + return serialize_varint((uint64_t)d); + } + static size_t varint_len(uint16_t d) + { + return varint_len((uint64_t)d); + } + static size_t max_varint_len(uint16_t) + { + return max_varint_len((uint64_t)0); + } + bool serialize_varint(uint32_t d) + { + return serialize_varint((uint64_t)d); + } + static size_t varint_len(uint32_t d) + { + return varint_len((uint64_t)d); + } + static size_t max_varint_len(uint32_t) + { + return max_varint_len((uint64_t)0); + } + bool serialize_varint(int8_t d) + { + return serialize_varint((int64_t)d); + } + static size_t varint_len(int8_t d) + { + return varint_len((int64_t)d); + } + static size_t max_varint_len(int8_t) + { + return max_varint_len((int64_t)0); + } + bool serialize_varint(int16_t d) + { + return serialize_varint((int64_t)d); + } + static size_t varint_len(int16_t d) + { + return varint_len((int64_t)d); + } + static size_t max_varint_len(int16_t) + { + return max_varint_len((int64_t)0); + } + bool serialize_varint(int32_t d) + { + return serialize_varint((int64_t)d); + } + static size_t varint_len(int32_t d) + { + return varint_len((int64_t)d); + } + static size_t max_varint_len(int32_t) + { + return max_varint_len((int64_t)0); + } + bool serialize_varint(int64_t d) + { + return serialize_varint((uint64_t)((d << 1) ^ (d >> 63))); + } + static size_t varint_len(int64_t d) + { + return varint_len((uint64_t)((d << 1) ^ (d >> 63))); + } + static size_t max_varint_len(int64_t) + { + return max_varint_len((uint64_t)0); + } + bool serialize_varint(uint64_t d); + static size_t varint_len(uint64_t d); + static size_t max_varint_len(uint64_t) + { + return 9; + } +private: + void* _buf; // current buffer to write + int _buf_size; // size of current buffer to write + WriteBufferPtr _stream; // output stream +}; + +// Deserializer on input stream +class Deserializer +{ +public: + explicit Deserializer(ReadBufferPtr& stream) : _stream(stream) + { + _buf = NULL; + _buf_size = 0; + } + ~Deserializer() + { } + // close deserializer, backup un-used buffer + void close(); + + template + bool deserialize_raw_data(T& d) + { + if (static_cast(_buf_size) >= sizeof(T)) + { + d = *(const T*)_buf; + _buf = ((const char*)_buf) + sizeof(T); + _buf_size -= sizeof(T); + return true; + } + return deserialize_buffer(&d, sizeof(T)); + } + bool deserialize_1_byte(void* d) + { + return deserialize_raw_data(*(uint8_t*)d); + } + bool deserialize_2_byte(void* d) + { + return deserialize_raw_data(*(uint16_t*)d); + } + bool deserialize_3_byte(void* d) + { + return deserialize_raw_data(*(byte_helper_t<3>*)d); + } + bool deserialize_4_byte(void* d) + { + return deserialize_raw_data(*(uint32_t*)d); + } + bool deserialize_5_byte(void* d) + { + return deserialize_raw_data(*(byte_helper_t<5>*)d); + } + bool deserialize_6_byte(void* d) + { + return deserialize_raw_data(*(byte_helper_t<6>*)d); + } + bool deserialize_7_byte(void* d) + { + return deserialize_raw_data(*(byte_helper_t<7>*)d); + } + bool deserialize_8_byte(void* d) + { + return deserialize_raw_data(*(uint64_t*)d); + } + bool deserialize_buffer(void* data, size_t data_len); + bool deserialize_string(std::string& d); + bool deserialize_varint(uint8_t& d) + { + uint64_t tmp; + if (!deserialize_varint(tmp)) + { + return false; + } + d = tmp; + return true; + } + bool deserialize_varint(uint16_t& d) + { + uint64_t tmp; + if (!deserialize_varint(tmp)) + { + return false; + } + d = tmp; + return true; + } + bool deserialize_varint(uint32_t& d) + { + uint64_t tmp; + if (!deserialize_varint(tmp)) + { + return false; + } + d = tmp; + return true; + } + bool deserialize_varint(int8_t& d) + { + int64_t tmp; + if (!deserialize_varint(tmp)) + { + return false; + } + d = tmp; + return true; + } + bool deserialize_varint(int16_t& d) + { + int64_t tmp; + if (!deserialize_varint(tmp)) + { + return false; + } + d = tmp; + return true; + } + bool deserialize_varint(int32_t& d) + { + int64_t tmp; + if (!deserialize_varint(tmp)) + { + return false; + } + d = tmp; + return true; + } + bool deserialize_varint(int64_t& d) + { + if (!deserialize_varint((uint64_t&)d)) + { + return false; + } + d = (((d & 1) << 63 >> 63) ^ (((uint64_t)d) >> 1)); + return true; + } + bool deserialize_varint(uint64_t& d); +private: + const void* _buf; // current buffer to read + int _buf_size; // size of current buffer to read + ReadBufferPtr _stream; // input stream +}; + +} // namespace pbrpc +} // namespace sofa + +#endif // _SOFA_PBRPC_SERIALIZE_H_ + +/* vim: set ts=4 sw=4 sts=4 tw=100 */