byte_buffer_reader.c 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. /*
  2. *
  3. * Copyright 2015 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #include <grpc/byte_buffer_reader.h>
  19. #include <string.h>
  20. #include <grpc/byte_buffer.h>
  21. #include <grpc/compression.h>
  22. #include <grpc/grpc.h>
  23. #include <grpc/slice_buffer.h>
  24. #include <grpc/support/alloc.h>
  25. #include <grpc/support/log.h>
  26. #include "src/core/lib/compression/message_compress.h"
  27. #include "src/core/lib/slice/slice_internal.h"
  28. static int is_compressed(grpc_byte_buffer *buffer) {
  29. switch (buffer->type) {
  30. case GRPC_BB_RAW:
  31. if (buffer->data.raw.compression == GRPC_COMPRESS_NONE) {
  32. return 0 /* GPR_FALSE */;
  33. }
  34. break;
  35. }
  36. return 1 /* GPR_TRUE */;
  37. }
  38. int grpc_byte_buffer_reader_init(grpc_byte_buffer_reader *reader,
  39. grpc_byte_buffer *buffer) {
  40. grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
  41. grpc_slice_buffer decompressed_slices_buffer;
  42. reader->buffer_in = buffer;
  43. switch (reader->buffer_in->type) {
  44. case GRPC_BB_RAW:
  45. grpc_slice_buffer_init(&decompressed_slices_buffer);
  46. if (is_compressed(reader->buffer_in)) {
  47. if (grpc_msg_decompress(&exec_ctx,
  48. reader->buffer_in->data.raw.compression,
  49. &reader->buffer_in->data.raw.slice_buffer,
  50. &decompressed_slices_buffer) == 0) {
  51. gpr_log(GPR_ERROR,
  52. "Unexpected error decompressing data for algorithm with enum "
  53. "value '%d'.",
  54. reader->buffer_in->data.raw.compression);
  55. memset(reader, 0, sizeof(*reader));
  56. return 0;
  57. } else { /* all fine */
  58. reader->buffer_out =
  59. grpc_raw_byte_buffer_create(decompressed_slices_buffer.slices,
  60. decompressed_slices_buffer.count);
  61. }
  62. grpc_slice_buffer_destroy_internal(&exec_ctx,
  63. &decompressed_slices_buffer);
  64. } else { /* not compressed, use the input buffer as output */
  65. reader->buffer_out = reader->buffer_in;
  66. }
  67. reader->current.index = 0;
  68. break;
  69. }
  70. grpc_exec_ctx_finish(&exec_ctx);
  71. return 1;
  72. }
  73. void grpc_byte_buffer_reader_destroy(grpc_byte_buffer_reader *reader) {
  74. switch (reader->buffer_in->type) {
  75. case GRPC_BB_RAW:
  76. /* keeping the same if-else structure as in the init function */
  77. if (is_compressed(reader->buffer_in)) {
  78. grpc_byte_buffer_destroy(reader->buffer_out);
  79. }
  80. break;
  81. }
  82. }
  83. int grpc_byte_buffer_reader_next(grpc_byte_buffer_reader *reader,
  84. grpc_slice *slice) {
  85. switch (reader->buffer_in->type) {
  86. case GRPC_BB_RAW: {
  87. grpc_slice_buffer *slice_buffer;
  88. slice_buffer = &reader->buffer_out->data.raw.slice_buffer;
  89. if (reader->current.index < slice_buffer->count) {
  90. *slice = grpc_slice_ref_internal(
  91. slice_buffer->slices[reader->current.index]);
  92. reader->current.index += 1;
  93. return 1;
  94. }
  95. break;
  96. }
  97. }
  98. return 0;
  99. }
  100. grpc_slice grpc_byte_buffer_reader_readall(grpc_byte_buffer_reader *reader) {
  101. grpc_slice in_slice;
  102. size_t bytes_read = 0;
  103. const size_t input_size = grpc_byte_buffer_length(reader->buffer_out);
  104. grpc_slice out_slice = GRPC_SLICE_MALLOC(input_size);
  105. uint8_t *const outbuf = GRPC_SLICE_START_PTR(out_slice); /* just an alias */
  106. grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
  107. while (grpc_byte_buffer_reader_next(reader, &in_slice) != 0) {
  108. const size_t slice_length = GRPC_SLICE_LENGTH(in_slice);
  109. memcpy(&(outbuf[bytes_read]), GRPC_SLICE_START_PTR(in_slice), slice_length);
  110. bytes_read += slice_length;
  111. grpc_slice_unref_internal(&exec_ctx, in_slice);
  112. GPR_ASSERT(bytes_read <= input_size);
  113. }
  114. grpc_exec_ctx_finish(&exec_ctx);
  115. return out_slice;
  116. }