|  | @@ -34,7 +34,7 @@
 | 
	
		
			
				|  |  |  #include "src/core/lib/slice/slice_internal.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/slice/slice_string_helpers.h"
 | 
	
		
			
				|  |  |  #include "src/core/lib/support/string.h"
 | 
	
		
			
				|  |  | -#include "src/core/tsi/transport_security_interface.h"
 | 
	
		
			
				|  |  | +#include "src/core/tsi/transport_security_grpc.h"
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  #define STAGING_BUFFER_SIZE 8192
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -42,6 +42,7 @@ typedef struct {
 | 
	
		
			
				|  |  |    grpc_endpoint base;
 | 
	
		
			
				|  |  |    grpc_endpoint *wrapped_ep;
 | 
	
		
			
				|  |  |    struct tsi_frame_protector *protector;
 | 
	
		
			
				|  |  | +  struct tsi_zero_copy_grpc_protector *zero_copy_protector;
 | 
	
		
			
				|  |  |    gpr_mu protector_mu;
 | 
	
		
			
				|  |  |    /* saved upper level callbacks and user_data. */
 | 
	
		
			
				|  |  |    grpc_closure *read_cb;
 | 
	
	
		
			
				|  | @@ -67,6 +68,7 @@ static void destroy(grpc_exec_ctx *exec_ctx, secure_endpoint *secure_ep) {
 | 
	
		
			
				|  |  |    secure_endpoint *ep = secure_ep;
 | 
	
		
			
				|  |  |    grpc_endpoint_destroy(exec_ctx, ep->wrapped_ep);
 | 
	
		
			
				|  |  |    tsi_frame_protector_destroy(ep->protector);
 | 
	
		
			
				|  |  | +  tsi_zero_copy_grpc_protector_destroy(ep->zero_copy_protector);
 | 
	
		
			
				|  |  |    grpc_slice_buffer_destroy_internal(exec_ctx, &ep->leftover_bytes);
 | 
	
		
			
				|  |  |    grpc_slice_unref_internal(exec_ctx, ep->read_staging_buffer);
 | 
	
		
			
				|  |  |    grpc_slice_unref_internal(exec_ctx, ep->write_staging_buffer);
 | 
	
	
		
			
				|  | @@ -159,51 +161,58 @@ static void on_read(grpc_exec_ctx *exec_ctx, void *user_data,
 | 
	
		
			
				|  |  |      return;
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  /* TODO(yangg) check error, maybe bail out early */
 | 
	
		
			
				|  |  | -  for (i = 0; i < ep->source_buffer.count; i++) {
 | 
	
		
			
				|  |  | -    grpc_slice encrypted = ep->source_buffer.slices[i];
 | 
	
		
			
				|  |  | -    uint8_t *message_bytes = GRPC_SLICE_START_PTR(encrypted);
 | 
	
		
			
				|  |  | -    size_t message_size = GRPC_SLICE_LENGTH(encrypted);
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -    while (message_size > 0 || keep_looping) {
 | 
	
		
			
				|  |  | -      size_t unprotected_buffer_size_written = (size_t)(end - cur);
 | 
	
		
			
				|  |  | -      size_t processed_message_size = message_size;
 | 
	
		
			
				|  |  | -      gpr_mu_lock(&ep->protector_mu);
 | 
	
		
			
				|  |  | -      result = tsi_frame_protector_unprotect(ep->protector, message_bytes,
 | 
	
		
			
				|  |  | -                                             &processed_message_size, cur,
 | 
	
		
			
				|  |  | -                                             &unprotected_buffer_size_written);
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(&ep->protector_mu);
 | 
	
		
			
				|  |  | -      if (result != TSI_OK) {
 | 
	
		
			
				|  |  | -        gpr_log(GPR_ERROR, "Decryption error: %s",
 | 
	
		
			
				|  |  | -                tsi_result_to_string(result));
 | 
	
		
			
				|  |  | -        break;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      message_bytes += processed_message_size;
 | 
	
		
			
				|  |  | -      message_size -= processed_message_size;
 | 
	
		
			
				|  |  | -      cur += unprotected_buffer_size_written;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      if (cur == end) {
 | 
	
		
			
				|  |  | -        flush_read_staging_buffer(ep, &cur, &end);
 | 
	
		
			
				|  |  | -        /* Force to enter the loop again to extract buffered bytes in protector.
 | 
	
		
			
				|  |  | -           The bytes could be buffered because of running out of staging_buffer.
 | 
	
		
			
				|  |  | -           If this happens at the end of all slices, doing another unprotect
 | 
	
		
			
				|  |  | -           avoids leaving data in the protector. */
 | 
	
		
			
				|  |  | -        keep_looping = 1;
 | 
	
		
			
				|  |  | -      } else if (unprotected_buffer_size_written > 0) {
 | 
	
		
			
				|  |  | -        keep_looping = 1;
 | 
	
		
			
				|  |  | -      } else {
 | 
	
		
			
				|  |  | -        keep_looping = 0;
 | 
	
		
			
				|  |  | +  if (ep->zero_copy_protector != NULL) {
 | 
	
		
			
				|  |  | +    // Use zero-copy grpc protector to unprotect.
 | 
	
		
			
				|  |  | +    result = tsi_zero_copy_grpc_protector_unprotect(
 | 
	
		
			
				|  |  | +        ep->zero_copy_protector, &ep->source_buffer, ep->read_buffer);
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    // Use frame protector to unprotect.
 | 
	
		
			
				|  |  | +    /* TODO(yangg) check error, maybe bail out early */
 | 
	
		
			
				|  |  | +    for (i = 0; i < ep->source_buffer.count; i++) {
 | 
	
		
			
				|  |  | +      grpc_slice encrypted = ep->source_buffer.slices[i];
 | 
	
		
			
				|  |  | +      uint8_t *message_bytes = GRPC_SLICE_START_PTR(encrypted);
 | 
	
		
			
				|  |  | +      size_t message_size = GRPC_SLICE_LENGTH(encrypted);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +      while (message_size > 0 || keep_looping) {
 | 
	
		
			
				|  |  | +        size_t unprotected_buffer_size_written = (size_t)(end - cur);
 | 
	
		
			
				|  |  | +        size_t processed_message_size = message_size;
 | 
	
		
			
				|  |  | +        gpr_mu_lock(&ep->protector_mu);
 | 
	
		
			
				|  |  | +        result = tsi_frame_protector_unprotect(
 | 
	
		
			
				|  |  | +            ep->protector, message_bytes, &processed_message_size, cur,
 | 
	
		
			
				|  |  | +            &unprotected_buffer_size_written);
 | 
	
		
			
				|  |  | +        gpr_mu_unlock(&ep->protector_mu);
 | 
	
		
			
				|  |  | +        if (result != TSI_OK) {
 | 
	
		
			
				|  |  | +          gpr_log(GPR_ERROR, "Decryption error: %s",
 | 
	
		
			
				|  |  | +                  tsi_result_to_string(result));
 | 
	
		
			
				|  |  | +          break;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        message_bytes += processed_message_size;
 | 
	
		
			
				|  |  | +        message_size -= processed_message_size;
 | 
	
		
			
				|  |  | +        cur += unprotected_buffer_size_written;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if (cur == end) {
 | 
	
		
			
				|  |  | +          flush_read_staging_buffer(ep, &cur, &end);
 | 
	
		
			
				|  |  | +          /* Force to enter the loop again to extract buffered bytes in
 | 
	
		
			
				|  |  | +             protector. The bytes could be buffered because of running out of
 | 
	
		
			
				|  |  | +             staging_buffer. If this happens at the end of all slices, doing
 | 
	
		
			
				|  |  | +             another unprotect avoids leaving data in the protector. */
 | 
	
		
			
				|  |  | +          keep_looping = 1;
 | 
	
		
			
				|  |  | +        } else if (unprotected_buffer_size_written > 0) {
 | 
	
		
			
				|  |  | +          keep_looping = 1;
 | 
	
		
			
				|  |  | +        } else {
 | 
	
		
			
				|  |  | +          keep_looping = 0;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | +      if (result != TSI_OK) break;
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  | -    if (result != TSI_OK) break;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
 | 
	
		
			
				|  |  | -    grpc_slice_buffer_add(
 | 
	
		
			
				|  |  | -        ep->read_buffer,
 | 
	
		
			
				|  |  | -        grpc_slice_split_head(
 | 
	
		
			
				|  |  | -            &ep->read_staging_buffer,
 | 
	
		
			
				|  |  | -            (size_t)(cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
 | 
	
		
			
				|  |  | +    if (cur != GRPC_SLICE_START_PTR(ep->read_staging_buffer)) {
 | 
	
		
			
				|  |  | +      grpc_slice_buffer_add(
 | 
	
		
			
				|  |  | +          ep->read_buffer,
 | 
	
		
			
				|  |  | +          grpc_slice_split_head(
 | 
	
		
			
				|  |  | +              &ep->read_staging_buffer,
 | 
	
		
			
				|  |  | +              (size_t)(cur - GRPC_SLICE_START_PTR(ep->read_staging_buffer))));
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    /* TODO(yangg) experiment with moving this block after read_cb to see if it
 | 
	
	
		
			
				|  | @@ -270,54 +279,62 @@ static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -  for (i = 0; i < slices->count; i++) {
 | 
	
		
			
				|  |  | -    grpc_slice plain = slices->slices[i];
 | 
	
		
			
				|  |  | -    uint8_t *message_bytes = GRPC_SLICE_START_PTR(plain);
 | 
	
		
			
				|  |  | -    size_t message_size = GRPC_SLICE_LENGTH(plain);
 | 
	
		
			
				|  |  | -    while (message_size > 0) {
 | 
	
		
			
				|  |  | -      size_t protected_buffer_size_to_send = (size_t)(end - cur);
 | 
	
		
			
				|  |  | -      size_t processed_message_size = message_size;
 | 
	
		
			
				|  |  | -      gpr_mu_lock(&ep->protector_mu);
 | 
	
		
			
				|  |  | -      result = tsi_frame_protector_protect(ep->protector, message_bytes,
 | 
	
		
			
				|  |  | -                                           &processed_message_size, cur,
 | 
	
		
			
				|  |  | -                                           &protected_buffer_size_to_send);
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(&ep->protector_mu);
 | 
	
		
			
				|  |  | -      if (result != TSI_OK) {
 | 
	
		
			
				|  |  | -        gpr_log(GPR_ERROR, "Encryption error: %s",
 | 
	
		
			
				|  |  | -                tsi_result_to_string(result));
 | 
	
		
			
				|  |  | -        break;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      message_bytes += processed_message_size;
 | 
	
		
			
				|  |  | -      message_size -= processed_message_size;
 | 
	
		
			
				|  |  | -      cur += protected_buffer_size_to_send;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -      if (cur == end) {
 | 
	
		
			
				|  |  | -        flush_write_staging_buffer(ep, &cur, &end);
 | 
	
		
			
				|  |  | +  if (ep->zero_copy_protector != NULL) {
 | 
	
		
			
				|  |  | +    // Use zero-copy grpc protector to protect.
 | 
	
		
			
				|  |  | +    result = tsi_zero_copy_grpc_protector_protect(ep->zero_copy_protector,
 | 
	
		
			
				|  |  | +                                                  slices, &ep->output_buffer);
 | 
	
		
			
				|  |  | +  } else {
 | 
	
		
			
				|  |  | +    // Use frame protector to protect.
 | 
	
		
			
				|  |  | +    for (i = 0; i < slices->count; i++) {
 | 
	
		
			
				|  |  | +      grpc_slice plain = slices->slices[i];
 | 
	
		
			
				|  |  | +      uint8_t *message_bytes = GRPC_SLICE_START_PTR(plain);
 | 
	
		
			
				|  |  | +      size_t message_size = GRPC_SLICE_LENGTH(plain);
 | 
	
		
			
				|  |  | +      while (message_size > 0) {
 | 
	
		
			
				|  |  | +        size_t protected_buffer_size_to_send = (size_t)(end - cur);
 | 
	
		
			
				|  |  | +        size_t processed_message_size = message_size;
 | 
	
		
			
				|  |  | +        gpr_mu_lock(&ep->protector_mu);
 | 
	
		
			
				|  |  | +        result = tsi_frame_protector_protect(ep->protector, message_bytes,
 | 
	
		
			
				|  |  | +                                             &processed_message_size, cur,
 | 
	
		
			
				|  |  | +                                             &protected_buffer_size_to_send);
 | 
	
		
			
				|  |  | +        gpr_mu_unlock(&ep->protector_mu);
 | 
	
		
			
				|  |  | +        if (result != TSI_OK) {
 | 
	
		
			
				|  |  | +          gpr_log(GPR_ERROR, "Encryption error: %s",
 | 
	
		
			
				|  |  | +                  tsi_result_to_string(result));
 | 
	
		
			
				|  |  | +          break;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        message_bytes += processed_message_size;
 | 
	
		
			
				|  |  | +        message_size -= processed_message_size;
 | 
	
		
			
				|  |  | +        cur += protected_buffer_size_to_send;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if (cur == end) {
 | 
	
		
			
				|  |  | +          flush_write_staging_buffer(ep, &cur, &end);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    if (result != TSI_OK) break;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (result == TSI_OK) {
 | 
	
		
			
				|  |  | -    size_t still_pending_size;
 | 
	
		
			
				|  |  | -    do {
 | 
	
		
			
				|  |  | -      size_t protected_buffer_size_to_send = (size_t)(end - cur);
 | 
	
		
			
				|  |  | -      gpr_mu_lock(&ep->protector_mu);
 | 
	
		
			
				|  |  | -      result = tsi_frame_protector_protect_flush(ep->protector, cur,
 | 
	
		
			
				|  |  | -                                                 &protected_buffer_size_to_send,
 | 
	
		
			
				|  |  | -                                                 &still_pending_size);
 | 
	
		
			
				|  |  | -      gpr_mu_unlock(&ep->protector_mu);
 | 
	
		
			
				|  |  |        if (result != TSI_OK) break;
 | 
	
		
			
				|  |  | -      cur += protected_buffer_size_to_send;
 | 
	
		
			
				|  |  | -      if (cur == end) {
 | 
	
		
			
				|  |  | -        flush_write_staging_buffer(ep, &cur, &end);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    if (result == TSI_OK) {
 | 
	
		
			
				|  |  | +      size_t still_pending_size;
 | 
	
		
			
				|  |  | +      do {
 | 
	
		
			
				|  |  | +        size_t protected_buffer_size_to_send = (size_t)(end - cur);
 | 
	
		
			
				|  |  | +        gpr_mu_lock(&ep->protector_mu);
 | 
	
		
			
				|  |  | +        result = tsi_frame_protector_protect_flush(
 | 
	
		
			
				|  |  | +            ep->protector, cur, &protected_buffer_size_to_send,
 | 
	
		
			
				|  |  | +            &still_pending_size);
 | 
	
		
			
				|  |  | +        gpr_mu_unlock(&ep->protector_mu);
 | 
	
		
			
				|  |  | +        if (result != TSI_OK) break;
 | 
	
		
			
				|  |  | +        cur += protected_buffer_size_to_send;
 | 
	
		
			
				|  |  | +        if (cur == end) {
 | 
	
		
			
				|  |  | +          flush_write_staging_buffer(ep, &cur, &end);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +      } while (still_pending_size > 0);
 | 
	
		
			
				|  |  | +      if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
 | 
	
		
			
				|  |  | +        grpc_slice_buffer_add(
 | 
	
		
			
				|  |  | +            &ep->output_buffer,
 | 
	
		
			
				|  |  | +            grpc_slice_split_head(
 | 
	
		
			
				|  |  | +                &ep->write_staging_buffer,
 | 
	
		
			
				|  |  | +                (size_t)(cur -
 | 
	
		
			
				|  |  | +                         GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
 | 
	
		
			
				|  |  |        }
 | 
	
		
			
				|  |  | -    } while (still_pending_size > 0);
 | 
	
		
			
				|  |  | -    if (cur != GRPC_SLICE_START_PTR(ep->write_staging_buffer)) {
 | 
	
		
			
				|  |  | -      grpc_slice_buffer_add(
 | 
	
		
			
				|  |  | -          &ep->output_buffer,
 | 
	
		
			
				|  |  | -          grpc_slice_split_head(
 | 
	
		
			
				|  |  | -              &ep->write_staging_buffer,
 | 
	
		
			
				|  |  | -              (size_t)(cur - GRPC_SLICE_START_PTR(ep->write_staging_buffer))));
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |    }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -389,13 +406,16 @@ static const grpc_endpoint_vtable vtable = {endpoint_read,
 | 
	
		
			
				|  |  |                                              endpoint_get_fd};
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  grpc_endpoint *grpc_secure_endpoint_create(
 | 
	
		
			
				|  |  | -    struct tsi_frame_protector *protector, grpc_endpoint *transport,
 | 
	
		
			
				|  |  | -    grpc_slice *leftover_slices, size_t leftover_nslices) {
 | 
	
		
			
				|  |  | +    struct tsi_frame_protector *protector,
 | 
	
		
			
				|  |  | +    struct tsi_zero_copy_grpc_protector *zero_copy_protector,
 | 
	
		
			
				|  |  | +    grpc_endpoint *transport, grpc_slice *leftover_slices,
 | 
	
		
			
				|  |  | +    size_t leftover_nslices) {
 | 
	
		
			
				|  |  |    size_t i;
 | 
	
		
			
				|  |  |    secure_endpoint *ep = (secure_endpoint *)gpr_malloc(sizeof(secure_endpoint));
 | 
	
		
			
				|  |  |    ep->base.vtable = &vtable;
 | 
	
		
			
				|  |  |    ep->wrapped_ep = transport;
 | 
	
		
			
				|  |  |    ep->protector = protector;
 | 
	
		
			
				|  |  | +  ep->zero_copy_protector = zero_copy_protector;
 | 
	
		
			
				|  |  |    grpc_slice_buffer_init(&ep->leftover_bytes);
 | 
	
		
			
				|  |  |    for (i = 0; i < leftover_nslices; i++) {
 | 
	
		
			
				|  |  |      grpc_slice_buffer_add(&ep->leftover_bytes,
 |