GRPC C++  1.11.0
proto_utils.h
Go to the documentation of this file.
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #ifndef GRPCPP_IMPL_CODEGEN_PROTO_UTILS_H
20 #define GRPCPP_IMPL_CODEGEN_PROTO_UTILS_H
21 
22 #include <type_traits>
23 
31 
32 namespace grpc {
33 
34 extern CoreCodegenInterface* g_core_codegen_interface;
35 
36 namespace internal {
37 
38 class GrpcBufferWriterPeer;
39 
40 const int kGrpcBufferWriterMaxBufferLength = 1024 * 1024;
41 
43  public:
44  GrpcBufferWriter(grpc_byte_buffer** bp, int block_size, int total_size)
45  : block_size_(block_size),
46  total_size_(total_size),
47  byte_count_(0),
48  have_backup_(false) {
49  *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(NULL, 0);
50  slice_buffer_ = &(*bp)->data.raw.slice_buffer;
51  }
52 
53  ~GrpcBufferWriter() override {
54  if (have_backup_) {
55  g_core_codegen_interface->grpc_slice_unref(backup_slice_);
56  }
57  }
58 
59  bool Next(void** data, int* size) override {
60  // Protobuf should not ask for more memory than total_size_.
62  size_t remain = total_size_ - byte_count_;
63  if (have_backup_) {
65  have_backup_ = false;
66  if (GRPC_SLICE_LENGTH(slice_) > remain) {
68  }
69  } else {
70  // When less than a whole block is needed, only allocate that much.
71  // But make sure the allocated slice is not inlined.
72  size_t allocate_length =
73  remain > static_cast<size_t>(block_size_) ? block_size_ : remain;
74  slice_ = g_core_codegen_interface->grpc_slice_malloc(
75  allocate_length > GRPC_SLICE_INLINED_SIZE
76  ? allocate_length
78  }
80  // On win x64, int is only 32bit
82  byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_);
83  g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_);
84  return true;
85  }
86 
87  void BackUp(int count) override {
88  g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_);
89  if ((size_t)count == GRPC_SLICE_LENGTH(slice_)) {
91  } else {
92  backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail(
93  &slice_, GRPC_SLICE_LENGTH(slice_) - count);
94  g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_);
95  }
96  // It's dangerous to keep an inlined grpc_slice as the backup slice, since
97  // on a following Next() call, a reference will be returned to this slice
98  // via GRPC_SLICE_START_PTR, which will not be an adddress held by
99  // slice_buffer_.
101  byte_count_ -= count;
102  }
103 
104  grpc::protobuf::int64 ByteCount() const override { return byte_count_; }
105 
106  protected:
107  friend class GrpcBufferWriterPeer;
108  const int block_size_;
109  const int total_size_;
110  int64_t byte_count_;
115 };
116 
118  public:
120  : byte_count_(0), backup_count_(0), status_() {
121  if (!g_core_codegen_interface->grpc_byte_buffer_reader_init(&reader_,
122  buffer)) {
123  status_ = Status(StatusCode::INTERNAL,
124  "Couldn't initialize byte buffer reader");
125  }
126  }
127  ~GrpcBufferReader() override {
128  g_core_codegen_interface->grpc_byte_buffer_reader_destroy(&reader_);
129  }
130 
131  bool Next(const void** data, int* size) override {
132  if (!status_.ok()) {
133  return false;
134  }
135  if (backup_count_ > 0) {
137  backup_count_;
138  GPR_CODEGEN_ASSERT(backup_count_ <= INT_MAX);
139  *size = (int)backup_count_;
140  backup_count_ = 0;
141  return true;
142  }
143  if (!g_core_codegen_interface->grpc_byte_buffer_reader_next(&reader_,
144  &slice_)) {
145  return false;
146  }
147  g_core_codegen_interface->grpc_slice_unref(slice_);
148  *data = GRPC_SLICE_START_PTR(slice_);
149  // On win x64, int is only 32bit
151  byte_count_ += * size = (int)GRPC_SLICE_LENGTH(slice_);
152  return true;
153  }
154 
155  Status status() const { return status_; }
156 
157  void BackUp(int count) override { backup_count_ = count; }
158 
159  bool Skip(int count) override {
160  const void* data;
161  int size;
162  while (Next(&data, &size)) {
163  if (size >= count) {
164  BackUp(size - count);
165  return true;
166  }
167  // size < count;
168  count -= size;
169  }
170  // error or we have too large count;
171  return false;
172  }
173 
174  grpc::protobuf::int64 ByteCount() const override {
175  return byte_count_ - backup_count_;
176  }
177 
178  protected:
179  int64_t byte_count_;
180  int64_t backup_count_;
184 };
185 
186 // BufferWriter must be a subclass of io::ZeroCopyOutputStream.
187 template <class BufferWriter, class T>
189  grpc_byte_buffer** bp, bool* own_buffer) {
190  static_assert(
191  std::is_base_of<protobuf::io::ZeroCopyOutputStream, BufferWriter>::value,
192  "BufferWriter must be a subclass of io::ZeroCopyOutputStream");
193  *own_buffer = true;
194  int byte_size = msg.ByteSize();
195  if ((size_t)byte_size <= GRPC_SLICE_INLINED_SIZE) {
196  grpc_slice slice = g_core_codegen_interface->grpc_slice_malloc(byte_size);
198  GRPC_SLICE_END_PTR(slice) ==
199  msg.SerializeWithCachedSizesToArray(GRPC_SLICE_START_PTR(slice)));
200  *bp = g_core_codegen_interface->grpc_raw_byte_buffer_create(&slice, 1);
201  g_core_codegen_interface->grpc_slice_unref(slice);
202 
203  return g_core_codegen_interface->ok();
204  }
205  BufferWriter writer(bp, kGrpcBufferWriterMaxBufferLength, byte_size);
206  return msg.SerializeToZeroCopyStream(&writer)
207  ? g_core_codegen_interface->ok()
208  : Status(StatusCode::INTERNAL, "Failed to serialize message");
209 }
210 
211 // BufferReader must be a subclass of io::ZeroCopyInputStream.
212 template <class BufferReader, class T>
215  static_assert(
216  std::is_base_of<protobuf::io::ZeroCopyInputStream, BufferReader>::value,
217  "BufferReader must be a subclass of io::ZeroCopyInputStream");
218  if (buffer == nullptr) {
219  return Status(StatusCode::INTERNAL, "No payload");
220  }
221  Status result = g_core_codegen_interface->ok();
222  {
223  BufferReader reader(buffer);
224  if (!reader.status().ok()) {
225  return reader.status();
226  }
228  decoder.SetTotalBytesLimit(INT_MAX, INT_MAX);
229  if (!msg->ParseFromCodedStream(&decoder)) {
230  result = Status(StatusCode::INTERNAL, msg->InitializationErrorString());
231  }
232  if (!decoder.ConsumedEntireMessage()) {
233  result = Status(StatusCode::INTERNAL, "Did not read entire message");
234  }
235  }
236  g_core_codegen_interface->grpc_byte_buffer_destroy(buffer);
237  return result;
238 }
239 
240 } // namespace internal
241 
242 // this is needed so the following class does not conflict with protobuf
243 // serializers that utilize internal-only tools.
244 #ifdef GRPC_OPEN_SOURCE_PROTO
245 // This class provides a protobuf serializer. It translates between protobuf
246 // objects and grpc_byte_buffers. More information about SerializationTraits can
247 // be found in include/grpcpp/impl/codegen/serialization_traits.h.
248 template <class T>
249 class SerializationTraits<T, typename std::enable_if<std::is_base_of<
250  grpc::protobuf::Message, T>::value>::type> {
251  public:
252  static Status Serialize(const grpc::protobuf::Message& msg,
253  grpc_byte_buffer** bp, bool* own_buffer) {
254  return internal::GenericSerialize<internal::GrpcBufferWriter, T>(
255  msg, bp, own_buffer);
256  }
257 
258  static Status Deserialize(grpc_byte_buffer* buffer,
260  return internal::GenericDeserialize<internal::GrpcBufferReader, T>(buffer,
261  msg);
262  }
263 };
264 #endif
265 
266 } // namespace grpc
267 
268 #endif // GRPCPP_IMPL_CODEGEN_PROTO_UTILS_H
Status GenericDeserialize(grpc_byte_buffer *buffer, grpc::protobuf::Message *msg)
Definition: proto_utils.h:213
#define GRPC_SLICE_SET_LENGTH(slice, newlen)
Definition: slice.h:122
#define GPR_CODEGEN_ASSERT(x)
Codegen specific version of GPR_ASSERT.
Definition: core_codegen_interface.h:133
Definition: proto_utils.h:117
~GrpcBufferWriter() override
Definition: proto_utils.h:53
Status status_
Definition: proto_utils.h:183
int64_t byte_count_
Definition: proto_utils.h:110
grpc::protobuf::int64 ByteCount() const override
Definition: proto_utils.h:174
const int kGrpcBufferWriterMaxBufferLength
Definition: proto_utils.h:40
Status GenericSerialize(const grpc::protobuf::Message &msg, grpc_byte_buffer **bp, bool *own_buffer)
Definition: proto_utils.h:188
A grpc_slice s, if initialized, represents the byte range s.bytes[0..s.length-1]. ...
Definition: slice.h:80
~GrpcBufferReader() override
Definition: proto_utils.h:127
int64_t byte_count_
Definition: proto_utils.h:179
bool Next(const void **data, int *size) override
Definition: proto_utils.h:131
const int total_size_
Definition: proto_utils.h:109
Definition: grpc_types.h:40
grpc_slice backup_slice_
Definition: proto_utils.h:113
#define GRPC_SLICE_START_PTR(slice)
Definition: slice.h:116
bool Skip(int count) override
Definition: proto_utils.h:159
Represents an expandable array of slices, to be interpreted as a single item.
Definition: slice.h:98
#define GRPC_SLICE_END_PTR(slice)
Definition: slice.h:125
void BackUp(int count) override
Definition: proto_utils.h:157
Defines how to serialize and deserialize some type.
Definition: serialization_traits.h:58
#define GRPC_SLICE_INLINED_SIZE
Definition: slice.h:68
GrpcBufferWriter(grpc_byte_buffer **bp, int block_size, int total_size)
Definition: proto_utils.h:44
::google::protobuf::io::ZeroCopyInputStream ZeroCopyInputStream
Definition: config_protobuf.h:88
struct grpc_slice_refcount * refcount
Definition: slice.h:81
void BackUp(int count) override
Definition: proto_utils.h:87
An Alarm posts the user provided tag to its associated completion queue upon expiry or cancellation...
Definition: alarm.h:31
grpc_slice slice_
Definition: proto_utils.h:182
const int block_size_
Definition: proto_utils.h:108
CoreCodegenInterface * g_core_codegen_interface
Definition: call.h:46
Definition: proto_utils.h:42
bool have_backup_
Definition: proto_utils.h:112
friend class GrpcBufferWriterPeer
Definition: proto_utils.h:107
::google::protobuf::int64 int64
Definition: config_protobuf.h:73
grpc_slice slice_
Definition: proto_utils.h:114
Status status() const
Definition: proto_utils.h:155
bool ok() const
Is the status OK?
Definition: status.h:118
grpc::protobuf::int64 ByteCount() const override
Definition: proto_utils.h:104
bool Next(void **data, int *size) override
Definition: proto_utils.h:59
Did it work? If it didn&#39;t, why?
Definition: status.h:31
grpc_slice_buffer * slice_buffer_
Definition: proto_utils.h:111
int64_t backup_count_
Definition: proto_utils.h:180
::google::protobuf::io::ZeroCopyOutputStream ZeroCopyOutputStream
Definition: config_protobuf.h:87
#define GRPC_SLICE_LENGTH(slice)
Definition: slice.h:119
GrpcBufferReader(grpc_byte_buffer *buffer)
Definition: proto_utils.h:119
Definition: byte_buffer_reader.h:28
Internal errors.
Definition: status_code_enum.h:119
grpc_byte_buffer_reader reader_
Definition: proto_utils.h:181
::google::protobuf::Message Message
Definition: config_protobuf.h:72
::google::protobuf::io::CodedInputStream CodedInputStream
Definition: config_protobuf.h:89