|  | @@ -1,574 +0,0 @@
 | 
	
		
			
				|  |  | -/*
 | 
	
		
			
				|  |  | - *
 | 
	
		
			
				|  |  | - * Copyright 2015 gRPC authors.
 | 
	
		
			
				|  |  | - *
 | 
	
		
			
				|  |  | - * Licensed under the Apache License, Version 2.0 (the "License");
 | 
	
		
			
				|  |  | - * you may not use this file except in compliance with the License.
 | 
	
		
			
				|  |  | - * You may obtain a copy of the License at
 | 
	
		
			
				|  |  | - *
 | 
	
		
			
				|  |  | - *     http://www.apache.org/licenses/LICENSE-2.0
 | 
	
		
			
				|  |  | - *
 | 
	
		
			
				|  |  | - * Unless required by applicable law or agreed to in writing, software
 | 
	
		
			
				|  |  | - * distributed under the License is distributed on an "AS IS" BASIS,
 | 
	
		
			
				|  |  | - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
	
		
			
				|  |  | - * See the License for the specific language governing permissions and
 | 
	
		
			
				|  |  | - * limitations under the License.
 | 
	
		
			
				|  |  | - *
 | 
	
		
			
				|  |  | - */
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -#include "src/core/ext/census/mlog.h"
 | 
	
		
			
				|  |  | -#include <grpc/support/cpu.h>
 | 
	
		
			
				|  |  | -#include <grpc/support/log.h>
 | 
	
		
			
				|  |  | -#include <grpc/support/port_platform.h>
 | 
	
		
			
				|  |  | -#include <grpc/support/sync.h>
 | 
	
		
			
				|  |  | -#include <grpc/support/thd.h>
 | 
	
		
			
				|  |  | -#include <grpc/support/time.h>
 | 
	
		
			
				|  |  | -#include <grpc/support/useful.h>
 | 
	
		
			
				|  |  | -#include <stdio.h>
 | 
	
		
			
				|  |  | -#include <stdlib.h>
 | 
	
		
			
				|  |  | -#include <string.h>
 | 
	
		
			
				|  |  | -#include "test/core/util/test_config.h"
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Change this to non-zero if you want more output.
 | 
	
		
			
				|  |  | -#define VERBOSE 0
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Log size to use for all tests.
 | 
	
		
			
				|  |  | -#define LOG_SIZE_IN_MB 1
 | 
	
		
			
				|  |  | -#define LOG_SIZE_IN_BYTES (LOG_SIZE_IN_MB << 20)
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Fills in 'record' of size 'size'. Each byte in record is filled in with the
 | 
	
		
			
				|  |  | -// same value. The value is extracted from 'record' pointer.
 | 
	
		
			
				|  |  | -static void write_record(char* record, size_t size) {
 | 
	
		
			
				|  |  | -  char data = (char)((uintptr_t)record % 255);
 | 
	
		
			
				|  |  | -  memset(record, data, size);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Reads fixed size records. Returns the number of records read in
 | 
	
		
			
				|  |  | -// 'num_records'.
 | 
	
		
			
				|  |  | -static void read_records(size_t record_size, const char* buffer,
 | 
	
		
			
				|  |  | -                         size_t buffer_size, int* num_records) {
 | 
	
		
			
				|  |  | -  GPR_ASSERT(buffer_size >= record_size);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(buffer_size % record_size == 0);
 | 
	
		
			
				|  |  | -  *num_records = (int)(buffer_size / record_size);
 | 
	
		
			
				|  |  | -  for (int i = 0; i < *num_records; ++i) {
 | 
	
		
			
				|  |  | -    const char* record = buffer + (record_size * (size_t)i);
 | 
	
		
			
				|  |  | -    char data = (char)((uintptr_t)record % 255);
 | 
	
		
			
				|  |  | -    for (size_t j = 0; j < record_size; ++j) {
 | 
	
		
			
				|  |  | -      GPR_ASSERT(data == record[j]);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Tries to write the specified number of records. Stops when the log gets
 | 
	
		
			
				|  |  | -// full. Returns the number of records written. Spins for random
 | 
	
		
			
				|  |  | -// number of times, up to 'max_spin_count', between writes.
 | 
	
		
			
				|  |  | -static int write_records_to_log(int writer_id, size_t record_size,
 | 
	
		
			
				|  |  | -                                int num_records, int max_spin_count) {
 | 
	
		
			
				|  |  | -  int counter = 0;
 | 
	
		
			
				|  |  | -  for (int i = 0; i < num_records; ++i) {
 | 
	
		
			
				|  |  | -    int spin_count = max_spin_count ? rand() % max_spin_count : 0;
 | 
	
		
			
				|  |  | -    if (VERBOSE && (counter++ == num_records / 10)) {
 | 
	
		
			
				|  |  | -      printf("   Writer %d: %d out of %d written\n", writer_id, i, num_records);
 | 
	
		
			
				|  |  | -      counter = 0;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    char* record = (char*)(census_log_start_write(record_size));
 | 
	
		
			
				|  |  | -    if (record == NULL) {
 | 
	
		
			
				|  |  | -      return i;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    write_record(record, record_size);
 | 
	
		
			
				|  |  | -    census_log_end_write(record, record_size);
 | 
	
		
			
				|  |  | -    for (int j = 0; j < spin_count; ++j) {
 | 
	
		
			
				|  |  | -      GPR_ASSERT(j >= 0);
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  return num_records;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Performs a single read iteration. Returns the number of records read.
 | 
	
		
			
				|  |  | -static int perform_read_iteration(size_t record_size) {
 | 
	
		
			
				|  |  | -  const void* read_buffer = NULL;
 | 
	
		
			
				|  |  | -  size_t bytes_available;
 | 
	
		
			
				|  |  | -  int records_read = 0;
 | 
	
		
			
				|  |  | -  census_log_init_reader();
 | 
	
		
			
				|  |  | -  while ((read_buffer = census_log_read_next(&bytes_available))) {
 | 
	
		
			
				|  |  | -    int num_records = 0;
 | 
	
		
			
				|  |  | -    read_records(record_size, (const char*)read_buffer, bytes_available,
 | 
	
		
			
				|  |  | -                 &num_records);
 | 
	
		
			
				|  |  | -    records_read += num_records;
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  return records_read;
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Asserts that the log is empty.
 | 
	
		
			
				|  |  | -static void assert_log_empty(void) {
 | 
	
		
			
				|  |  | -  census_log_init_reader();
 | 
	
		
			
				|  |  | -  size_t bytes_available;
 | 
	
		
			
				|  |  | -  GPR_ASSERT(census_log_read_next(&bytes_available) == NULL);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Fills the log and verifies data. If 'no fragmentation' is true, records
 | 
	
		
			
				|  |  | -// are sized such that CENSUS_LOG_2_MAX_RECORD_SIZE is a multiple of record
 | 
	
		
			
				|  |  | -// size. If not a circular log, verifies that the number of records written
 | 
	
		
			
				|  |  | -// match the number of records read.
 | 
	
		
			
				|  |  | -static void fill_log(size_t log_size, int no_fragmentation, int circular_log) {
 | 
	
		
			
				|  |  | -  size_t size;
 | 
	
		
			
				|  |  | -  if (no_fragmentation) {
 | 
	
		
			
				|  |  | -    int log2size = rand() % (CENSUS_LOG_2_MAX_RECORD_SIZE + 1);
 | 
	
		
			
				|  |  | -    size = ((size_t)1 << log2size);
 | 
	
		
			
				|  |  | -  } else {
 | 
	
		
			
				|  |  | -    while (1) {
 | 
	
		
			
				|  |  | -      size = 1 + ((size_t)rand() % CENSUS_LOG_MAX_RECORD_SIZE);
 | 
	
		
			
				|  |  | -      if (CENSUS_LOG_MAX_RECORD_SIZE % size) {
 | 
	
		
			
				|  |  | -        break;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  int records_written =
 | 
	
		
			
				|  |  | -      write_records_to_log(0 /* writer id */, size,
 | 
	
		
			
				|  |  | -                           (int)((log_size / size) * 2), 0 /* spin count */);
 | 
	
		
			
				|  |  | -  int records_read = perform_read_iteration(size);
 | 
	
		
			
				|  |  | -  if (!circular_log) {
 | 
	
		
			
				|  |  | -    GPR_ASSERT(records_written == records_read);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  assert_log_empty();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Structure to pass args to writer_thread
 | 
	
		
			
				|  |  | -typedef struct writer_thread_args {
 | 
	
		
			
				|  |  | -  // Index of this thread in the writers vector.
 | 
	
		
			
				|  |  | -  int index;
 | 
	
		
			
				|  |  | -  // Record size.
 | 
	
		
			
				|  |  | -  size_t record_size;
 | 
	
		
			
				|  |  | -  // Number of records to write.
 | 
	
		
			
				|  |  | -  int num_records;
 | 
	
		
			
				|  |  | -  // Used to signal when writer is complete
 | 
	
		
			
				|  |  | -  gpr_cv* done;
 | 
	
		
			
				|  |  | -  gpr_mu* mu;
 | 
	
		
			
				|  |  | -  int* count;
 | 
	
		
			
				|  |  | -} writer_thread_args;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Writes the given number of records of random size (up to kMaxRecordSize) and
 | 
	
		
			
				|  |  | -// random data to the specified log.
 | 
	
		
			
				|  |  | -static void writer_thread(void* arg) {
 | 
	
		
			
				|  |  | -  writer_thread_args* args = (writer_thread_args*)arg;
 | 
	
		
			
				|  |  | -  // Maximum number of times to spin between writes.
 | 
	
		
			
				|  |  | -  static const int MAX_SPIN_COUNT = 50;
 | 
	
		
			
				|  |  | -  int records_written = 0;
 | 
	
		
			
				|  |  | -  if (VERBOSE) {
 | 
	
		
			
				|  |  | -    printf("   Writer %d starting\n", args->index);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  while (records_written < args->num_records) {
 | 
	
		
			
				|  |  | -    records_written += write_records_to_log(args->index, args->record_size,
 | 
	
		
			
				|  |  | -                                            args->num_records - records_written,
 | 
	
		
			
				|  |  | -                                            MAX_SPIN_COUNT);
 | 
	
		
			
				|  |  | -    if (records_written < args->num_records) {
 | 
	
		
			
				|  |  | -      // Ran out of log space. Sleep for a bit and let the reader catch up.
 | 
	
		
			
				|  |  | -      // This should never happen for circular logs.
 | 
	
		
			
				|  |  | -      if (VERBOSE) {
 | 
	
		
			
				|  |  | -        printf(
 | 
	
		
			
				|  |  | -            "   Writer %d stalled due to out-of-space: %d out of %d "
 | 
	
		
			
				|  |  | -            "written\n",
 | 
	
		
			
				|  |  | -            args->index, records_written, args->num_records);
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      gpr_sleep_until(grpc_timeout_milliseconds_to_deadline(10));
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  // Done. Decrement count and signal.
 | 
	
		
			
				|  |  | -  gpr_mu_lock(args->mu);
 | 
	
		
			
				|  |  | -  (*args->count)--;
 | 
	
		
			
				|  |  | -  gpr_cv_signal(args->done);
 | 
	
		
			
				|  |  | -  if (VERBOSE) {
 | 
	
		
			
				|  |  | -    printf("   Writer %d done\n", args->index);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(args->mu);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// struct to pass args to reader_thread
 | 
	
		
			
				|  |  | -typedef struct reader_thread_args {
 | 
	
		
			
				|  |  | -  // Record size.
 | 
	
		
			
				|  |  | -  size_t record_size;
 | 
	
		
			
				|  |  | -  // Interval between read iterations.
 | 
	
		
			
				|  |  | -  int read_iteration_interval_in_msec;
 | 
	
		
			
				|  |  | -  // Total number of records.
 | 
	
		
			
				|  |  | -  int total_records;
 | 
	
		
			
				|  |  | -  // Signalled when reader should stop.
 | 
	
		
			
				|  |  | -  gpr_cv stop;
 | 
	
		
			
				|  |  | -  int stop_flag;
 | 
	
		
			
				|  |  | -  // Used to signal when reader has finished
 | 
	
		
			
				|  |  | -  gpr_cv* done;
 | 
	
		
			
				|  |  | -  gpr_mu* mu;
 | 
	
		
			
				|  |  | -  int running;
 | 
	
		
			
				|  |  | -} reader_thread_args;
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Reads and verifies the specified number of records. Reader can also be
 | 
	
		
			
				|  |  | -// stopped via gpr_cv_signal(&args->stop). Sleeps for 'read_interval_in_msec'
 | 
	
		
			
				|  |  | -// between read iterations.
 | 
	
		
			
				|  |  | -static void reader_thread(void* arg) {
 | 
	
		
			
				|  |  | -  reader_thread_args* args = (reader_thread_args*)arg;
 | 
	
		
			
				|  |  | -  if (VERBOSE) {
 | 
	
		
			
				|  |  | -    printf("   Reader starting\n");
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  gpr_timespec interval = gpr_time_from_micros(
 | 
	
		
			
				|  |  | -      args->read_iteration_interval_in_msec * 1000, GPR_TIMESPAN);
 | 
	
		
			
				|  |  | -  gpr_mu_lock(args->mu);
 | 
	
		
			
				|  |  | -  int records_read = 0;
 | 
	
		
			
				|  |  | -  int num_iterations = 0;
 | 
	
		
			
				|  |  | -  int counter = 0;
 | 
	
		
			
				|  |  | -  while (!args->stop_flag && records_read < args->total_records) {
 | 
	
		
			
				|  |  | -    gpr_cv_wait(&args->stop, args->mu, interval);
 | 
	
		
			
				|  |  | -    if (!args->stop_flag) {
 | 
	
		
			
				|  |  | -      records_read += perform_read_iteration(args->record_size);
 | 
	
		
			
				|  |  | -      GPR_ASSERT(records_read <= args->total_records);
 | 
	
		
			
				|  |  | -      if (VERBOSE && (counter++ == 100000)) {
 | 
	
		
			
				|  |  | -        printf("   Reader: %d out of %d read\n", records_read,
 | 
	
		
			
				|  |  | -               args->total_records);
 | 
	
		
			
				|  |  | -        counter = 0;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      ++num_iterations;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  // Done
 | 
	
		
			
				|  |  | -  args->running = 0;
 | 
	
		
			
				|  |  | -  gpr_cv_signal(args->done);
 | 
	
		
			
				|  |  | -  if (VERBOSE) {
 | 
	
		
			
				|  |  | -    printf("   Reader: records: %d, iterations: %d\n", records_read,
 | 
	
		
			
				|  |  | -           num_iterations);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(args->mu);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Creates NUM_WRITERS writers where each writer writes NUM_RECORDS_PER_WRITER
 | 
	
		
			
				|  |  | -// records. Also, starts a reader that iterates over and reads blocks every
 | 
	
		
			
				|  |  | -// READ_ITERATION_INTERVAL_IN_MSEC.
 | 
	
		
			
				|  |  | -// Number of writers.
 | 
	
		
			
				|  |  | -#define NUM_WRITERS 5
 | 
	
		
			
				|  |  | -static void multiple_writers_single_reader(int circular_log) {
 | 
	
		
			
				|  |  | -  // Sleep interval between read iterations.
 | 
	
		
			
				|  |  | -  static const int READ_ITERATION_INTERVAL_IN_MSEC = 10;
 | 
	
		
			
				|  |  | -  // Maximum record size.
 | 
	
		
			
				|  |  | -  static const size_t MAX_RECORD_SIZE = 20;
 | 
	
		
			
				|  |  | -  // Number of records written by each writer. This is sized such that we
 | 
	
		
			
				|  |  | -  // will write through the entire log ~10 times.
 | 
	
		
			
				|  |  | -  const int NUM_RECORDS_PER_WRITER =
 | 
	
		
			
				|  |  | -      (int)((10 * census_log_remaining_space()) / (MAX_RECORD_SIZE / 2)) /
 | 
	
		
			
				|  |  | -      NUM_WRITERS;
 | 
	
		
			
				|  |  | -  size_t record_size = ((size_t)rand() % MAX_RECORD_SIZE) + 1;
 | 
	
		
			
				|  |  | -  // Create and start writers.
 | 
	
		
			
				|  |  | -  writer_thread_args writers[NUM_WRITERS];
 | 
	
		
			
				|  |  | -  int writers_count = NUM_WRITERS;
 | 
	
		
			
				|  |  | -  gpr_cv writers_done;
 | 
	
		
			
				|  |  | -  gpr_mu writers_mu;  // protects writers_done and writers_count
 | 
	
		
			
				|  |  | -  gpr_cv_init(&writers_done);
 | 
	
		
			
				|  |  | -  gpr_mu_init(&writers_mu);
 | 
	
		
			
				|  |  | -  gpr_thd_id id;
 | 
	
		
			
				|  |  | -  for (int i = 0; i < NUM_WRITERS; ++i) {
 | 
	
		
			
				|  |  | -    writers[i].index = i;
 | 
	
		
			
				|  |  | -    writers[i].record_size = record_size;
 | 
	
		
			
				|  |  | -    writers[i].num_records = NUM_RECORDS_PER_WRITER;
 | 
	
		
			
				|  |  | -    writers[i].done = &writers_done;
 | 
	
		
			
				|  |  | -    writers[i].count = &writers_count;
 | 
	
		
			
				|  |  | -    writers[i].mu = &writers_mu;
 | 
	
		
			
				|  |  | -    gpr_thd_new(&id, &writer_thread, &writers[i], NULL);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  // Start reader.
 | 
	
		
			
				|  |  | -  gpr_cv reader_done;
 | 
	
		
			
				|  |  | -  gpr_mu reader_mu;  // protects reader_done and reader.running
 | 
	
		
			
				|  |  | -  reader_thread_args reader;
 | 
	
		
			
				|  |  | -  reader.record_size = record_size;
 | 
	
		
			
				|  |  | -  reader.read_iteration_interval_in_msec = READ_ITERATION_INTERVAL_IN_MSEC;
 | 
	
		
			
				|  |  | -  reader.total_records = NUM_WRITERS * NUM_RECORDS_PER_WRITER;
 | 
	
		
			
				|  |  | -  reader.stop_flag = 0;
 | 
	
		
			
				|  |  | -  gpr_cv_init(&reader.stop);
 | 
	
		
			
				|  |  | -  gpr_cv_init(&reader_done);
 | 
	
		
			
				|  |  | -  reader.done = &reader_done;
 | 
	
		
			
				|  |  | -  gpr_mu_init(&reader_mu);
 | 
	
		
			
				|  |  | -  reader.mu = &reader_mu;
 | 
	
		
			
				|  |  | -  reader.running = 1;
 | 
	
		
			
				|  |  | -  gpr_thd_new(&id, &reader_thread, &reader, NULL);
 | 
	
		
			
				|  |  | -  // Wait for writers to finish.
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&writers_mu);
 | 
	
		
			
				|  |  | -  while (writers_count != 0) {
 | 
	
		
			
				|  |  | -    gpr_cv_wait(&writers_done, &writers_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&writers_mu);
 | 
	
		
			
				|  |  | -  gpr_mu_destroy(&writers_mu);
 | 
	
		
			
				|  |  | -  gpr_cv_destroy(&writers_done);
 | 
	
		
			
				|  |  | -  gpr_mu_lock(&reader_mu);
 | 
	
		
			
				|  |  | -  if (circular_log) {
 | 
	
		
			
				|  |  | -    // Stop reader.
 | 
	
		
			
				|  |  | -    reader.stop_flag = 1;
 | 
	
		
			
				|  |  | -    gpr_cv_signal(&reader.stop);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  // wait for reader to finish
 | 
	
		
			
				|  |  | -  while (reader.running) {
 | 
	
		
			
				|  |  | -    gpr_cv_wait(&reader_done, &reader_mu, gpr_inf_future(GPR_CLOCK_REALTIME));
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  if (circular_log) {
 | 
	
		
			
				|  |  | -    // Assert that there were no out-of-space errors.
 | 
	
		
			
				|  |  | -    GPR_ASSERT(0 == census_log_out_of_space_count());
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  gpr_mu_unlock(&reader_mu);
 | 
	
		
			
				|  |  | -  gpr_mu_destroy(&reader_mu);
 | 
	
		
			
				|  |  | -  gpr_cv_destroy(&reader_done);
 | 
	
		
			
				|  |  | -  if (VERBOSE) {
 | 
	
		
			
				|  |  | -    printf("   Reader: finished\n");
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -static void setup_test(int circular_log) {
 | 
	
		
			
				|  |  | -  census_log_initialize(LOG_SIZE_IN_MB, circular_log);
 | 
	
		
			
				|  |  | -  //  GPR_ASSERT(census_log_remaining_space() == LOG_SIZE_IN_BYTES);
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Attempts to create a record of invalid size (size >
 | 
	
		
			
				|  |  | -// CENSUS_LOG_MAX_RECORD_SIZE).
 | 
	
		
			
				|  |  | -void test_invalid_record_size(void) {
 | 
	
		
			
				|  |  | -  static const size_t INVALID_SIZE = CENSUS_LOG_MAX_RECORD_SIZE + 1;
 | 
	
		
			
				|  |  | -  static const size_t VALID_SIZE = 1;
 | 
	
		
			
				|  |  | -  printf("Starting test: invalid record size\n");
 | 
	
		
			
				|  |  | -  setup_test(0);
 | 
	
		
			
				|  |  | -  void* record = census_log_start_write(INVALID_SIZE);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(record == NULL);
 | 
	
		
			
				|  |  | -  // Now try writing a valid record.
 | 
	
		
			
				|  |  | -  record = census_log_start_write(VALID_SIZE);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(record != NULL);
 | 
	
		
			
				|  |  | -  census_log_end_write(record, VALID_SIZE);
 | 
	
		
			
				|  |  | -  // Verifies that available space went down by one block. In theory, this
 | 
	
		
			
				|  |  | -  // check can fail if the thread is context switched to a new CPU during the
 | 
	
		
			
				|  |  | -  // start_write execution (multiple blocks get allocated), but this has not
 | 
	
		
			
				|  |  | -  // been observed in practice.
 | 
	
		
			
				|  |  | -  //  GPR_ASSERT(LOG_SIZE_IN_BYTES - CENSUS_LOG_MAX_RECORD_SIZE ==
 | 
	
		
			
				|  |  | -  //             census_log_remaining_space());
 | 
	
		
			
				|  |  | -  census_log_shutdown();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Tests end_write() with a different size than what was specified in
 | 
	
		
			
				|  |  | -// start_write().
 | 
	
		
			
				|  |  | -void test_end_write_with_different_size(void) {
 | 
	
		
			
				|  |  | -  static const size_t START_WRITE_SIZE = 10;
 | 
	
		
			
				|  |  | -  static const size_t END_WRITE_SIZE = 7;
 | 
	
		
			
				|  |  | -  printf("Starting test: end write with different size\n");
 | 
	
		
			
				|  |  | -  setup_test(0);
 | 
	
		
			
				|  |  | -  void* record_written = census_log_start_write(START_WRITE_SIZE);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(record_written != NULL);
 | 
	
		
			
				|  |  | -  census_log_end_write(record_written, END_WRITE_SIZE);
 | 
	
		
			
				|  |  | -  census_log_init_reader();
 | 
	
		
			
				|  |  | -  size_t bytes_available;
 | 
	
		
			
				|  |  | -  const void* record_read = census_log_read_next(&bytes_available);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(record_written == record_read);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(END_WRITE_SIZE == bytes_available);
 | 
	
		
			
				|  |  | -  assert_log_empty();
 | 
	
		
			
				|  |  | -  census_log_shutdown();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Verifies that pending records are not available via read_next().
 | 
	
		
			
				|  |  | -void test_read_pending_record(void) {
 | 
	
		
			
				|  |  | -  static const size_t PR_RECORD_SIZE = 1024;
 | 
	
		
			
				|  |  | -  printf("Starting test: read pending record\n");
 | 
	
		
			
				|  |  | -  setup_test(0);
 | 
	
		
			
				|  |  | -  // Start a write.
 | 
	
		
			
				|  |  | -  void* record_written = census_log_start_write(PR_RECORD_SIZE);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(record_written != NULL);
 | 
	
		
			
				|  |  | -  // As write is pending, read should fail.
 | 
	
		
			
				|  |  | -  census_log_init_reader();
 | 
	
		
			
				|  |  | -  size_t bytes_available;
 | 
	
		
			
				|  |  | -  const void* record_read = census_log_read_next(&bytes_available);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(record_read == NULL);
 | 
	
		
			
				|  |  | -  // A read followed by end_write() should succeed.
 | 
	
		
			
				|  |  | -  census_log_end_write(record_written, PR_RECORD_SIZE);
 | 
	
		
			
				|  |  | -  census_log_init_reader();
 | 
	
		
			
				|  |  | -  record_read = census_log_read_next(&bytes_available);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(record_written == record_read);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(PR_RECORD_SIZE == bytes_available);
 | 
	
		
			
				|  |  | -  assert_log_empty();
 | 
	
		
			
				|  |  | -  census_log_shutdown();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Tries reading beyond pending write.
 | 
	
		
			
				|  |  | -void test_read_beyond_pending_record(void) {
 | 
	
		
			
				|  |  | -  printf("Starting test: read beyond pending record\n");
 | 
	
		
			
				|  |  | -  setup_test(0);
 | 
	
		
			
				|  |  | -  // Start a write.
 | 
	
		
			
				|  |  | -  const size_t incomplete_record_size = 10;
 | 
	
		
			
				|  |  | -  void* incomplete_record = census_log_start_write(incomplete_record_size);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(incomplete_record != NULL);
 | 
	
		
			
				|  |  | -  const size_t complete_record_size = 20;
 | 
	
		
			
				|  |  | -  void* complete_record = census_log_start_write(complete_record_size);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(complete_record != NULL);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(complete_record != incomplete_record);
 | 
	
		
			
				|  |  | -  census_log_end_write(complete_record, complete_record_size);
 | 
	
		
			
				|  |  | -  // Now iterate over blocks to read completed records.
 | 
	
		
			
				|  |  | -  census_log_init_reader();
 | 
	
		
			
				|  |  | -  size_t bytes_available;
 | 
	
		
			
				|  |  | -  const void* record_read = census_log_read_next(&bytes_available);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(complete_record == record_read);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(complete_record_size == bytes_available);
 | 
	
		
			
				|  |  | -  // Complete first record.
 | 
	
		
			
				|  |  | -  census_log_end_write(incomplete_record, incomplete_record_size);
 | 
	
		
			
				|  |  | -  // Have read past the incomplete record, so read_next() should return NULL.
 | 
	
		
			
				|  |  | -  // NB: this test also assumes our thread did not get switched to a different
 | 
	
		
			
				|  |  | -  // CPU between the two start_write calls
 | 
	
		
			
				|  |  | -  record_read = census_log_read_next(&bytes_available);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(record_read == NULL);
 | 
	
		
			
				|  |  | -  // Reset reader to get the newly completed record.
 | 
	
		
			
				|  |  | -  census_log_init_reader();
 | 
	
		
			
				|  |  | -  record_read = census_log_read_next(&bytes_available);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(incomplete_record == record_read);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(incomplete_record_size == bytes_available);
 | 
	
		
			
				|  |  | -  assert_log_empty();
 | 
	
		
			
				|  |  | -  census_log_shutdown();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Tests scenario where block being read is detached from a core and put on the
 | 
	
		
			
				|  |  | -// dirty list.
 | 
	
		
			
				|  |  | -void test_detached_while_reading(void) {
 | 
	
		
			
				|  |  | -  printf("Starting test: detached while reading\n");
 | 
	
		
			
				|  |  | -  setup_test(0);
 | 
	
		
			
				|  |  | -  // Start a write.
 | 
	
		
			
				|  |  | -  static const size_t DWR_RECORD_SIZE = 10;
 | 
	
		
			
				|  |  | -  void* record_written = census_log_start_write(DWR_RECORD_SIZE);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(record_written != NULL);
 | 
	
		
			
				|  |  | -  census_log_end_write(record_written, DWR_RECORD_SIZE);
 | 
	
		
			
				|  |  | -  // Read this record.
 | 
	
		
			
				|  |  | -  census_log_init_reader();
 | 
	
		
			
				|  |  | -  size_t bytes_available;
 | 
	
		
			
				|  |  | -  const void* record_read = census_log_read_next(&bytes_available);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(record_read != NULL);
 | 
	
		
			
				|  |  | -  GPR_ASSERT(DWR_RECORD_SIZE == bytes_available);
 | 
	
		
			
				|  |  | -  // Now fill the log. This will move the block being read from core-local
 | 
	
		
			
				|  |  | -  // array to the dirty list.
 | 
	
		
			
				|  |  | -  while ((record_written = census_log_start_write(DWR_RECORD_SIZE))) {
 | 
	
		
			
				|  |  | -    census_log_end_write(record_written, DWR_RECORD_SIZE);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -  // In this iteration, read_next() should only traverse blocks in the
 | 
	
		
			
				|  |  | -  // core-local array. Therefore, we expect at most gpr_cpu_num_cores() more
 | 
	
		
			
				|  |  | -  // blocks. As log is full, if read_next() is traversing the dirty list, we
 | 
	
		
			
				|  |  | -  // will get more than gpr_cpu_num_cores() blocks.
 | 
	
		
			
				|  |  | -  int block_read = 0;
 | 
	
		
			
				|  |  | -  while ((record_read = census_log_read_next(&bytes_available))) {
 | 
	
		
			
				|  |  | -    ++block_read;
 | 
	
		
			
				|  |  | -    GPR_ASSERT(block_read <= (int)gpr_cpu_num_cores());
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -  census_log_shutdown();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Fills non-circular log with records sized such that size is a multiple of
 | 
	
		
			
				|  |  | -// CENSUS_LOG_MAX_RECORD_SIZE (no per-block fragmentation).
 | 
	
		
			
				|  |  | -void test_fill_log_no_fragmentation(void) {
 | 
	
		
			
				|  |  | -  printf("Starting test: fill log no fragmentation\n");
 | 
	
		
			
				|  |  | -  const int circular = 0;
 | 
	
		
			
				|  |  | -  setup_test(circular);
 | 
	
		
			
				|  |  | -  fill_log(LOG_SIZE_IN_BYTES, 1 /* no fragmentation */, circular);
 | 
	
		
			
				|  |  | -  census_log_shutdown();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Fills circular log with records sized such that size is a multiple of
 | 
	
		
			
				|  |  | -// CENSUS_LOG_MAX_RECORD_SIZE (no per-block fragmentation).
 | 
	
		
			
				|  |  | -void test_fill_circular_log_no_fragmentation(void) {
 | 
	
		
			
				|  |  | -  printf("Starting test: fill circular log no fragmentation\n");
 | 
	
		
			
				|  |  | -  const int circular = 1;
 | 
	
		
			
				|  |  | -  setup_test(circular);
 | 
	
		
			
				|  |  | -  fill_log(LOG_SIZE_IN_BYTES, 1 /* no fragmentation */, circular);
 | 
	
		
			
				|  |  | -  census_log_shutdown();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Fills non-circular log with records that may straddle end of a block.
 | 
	
		
			
				|  |  | -void test_fill_log_with_straddling_records(void) {
 | 
	
		
			
				|  |  | -  printf("Starting test: fill log with straddling records\n");
 | 
	
		
			
				|  |  | -  const int circular = 0;
 | 
	
		
			
				|  |  | -  setup_test(circular);
 | 
	
		
			
				|  |  | -  fill_log(LOG_SIZE_IN_BYTES, 0 /* block straddling records */, circular);
 | 
	
		
			
				|  |  | -  census_log_shutdown();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Fills circular log with records that may straddle end of a block.
 | 
	
		
			
				|  |  | -void test_fill_circular_log_with_straddling_records(void) {
 | 
	
		
			
				|  |  | -  printf("Starting test: fill circular log with straddling records\n");
 | 
	
		
			
				|  |  | -  const int circular = 1;
 | 
	
		
			
				|  |  | -  setup_test(circular);
 | 
	
		
			
				|  |  | -  fill_log(LOG_SIZE_IN_BYTES, 0 /* block straddling records */, circular);
 | 
	
		
			
				|  |  | -  census_log_shutdown();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Tests scenario where multiple writers and a single reader are using a log
 | 
	
		
			
				|  |  | -// that is configured to discard old records.
 | 
	
		
			
				|  |  | -void test_multiple_writers_circular_log(void) {
 | 
	
		
			
				|  |  | -  printf("Starting test: multiple writers circular log\n");
 | 
	
		
			
				|  |  | -  const int circular = 1;
 | 
	
		
			
				|  |  | -  setup_test(circular);
 | 
	
		
			
				|  |  | -  multiple_writers_single_reader(circular);
 | 
	
		
			
				|  |  | -  census_log_shutdown();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Tests scenario where multiple writers and a single reader are using a log
 | 
	
		
			
				|  |  | -// that is configured to discard old records.
 | 
	
		
			
				|  |  | -void test_multiple_writers(void) {
 | 
	
		
			
				|  |  | -  printf("Starting test: multiple writers\n");
 | 
	
		
			
				|  |  | -  const int circular = 0;
 | 
	
		
			
				|  |  | -  setup_test(circular);
 | 
	
		
			
				|  |  | -  multiple_writers_single_reader(circular);
 | 
	
		
			
				|  |  | -  census_log_shutdown();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -// Repeat the straddling records and multiple writers tests with a small log.
 | 
	
		
			
				|  |  | -void test_small_log(void) {
 | 
	
		
			
				|  |  | -  printf("Starting test: small log\n");
 | 
	
		
			
				|  |  | -  const int circular = 0;
 | 
	
		
			
				|  |  | -  census_log_initialize(0, circular);
 | 
	
		
			
				|  |  | -  size_t log_size = census_log_remaining_space();
 | 
	
		
			
				|  |  | -  GPR_ASSERT(log_size > 0);
 | 
	
		
			
				|  |  | -  fill_log(log_size, 0, circular);
 | 
	
		
			
				|  |  | -  census_log_shutdown();
 | 
	
		
			
				|  |  | -  census_log_initialize(0, circular);
 | 
	
		
			
				|  |  | -  multiple_writers_single_reader(circular);
 | 
	
		
			
				|  |  | -  census_log_shutdown();
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -void test_performance(void) {
 | 
	
		
			
				|  |  | -  for (size_t write_size = 1; write_size < CENSUS_LOG_MAX_RECORD_SIZE;
 | 
	
		
			
				|  |  | -       write_size *= 2) {
 | 
	
		
			
				|  |  | -    setup_test(0);
 | 
	
		
			
				|  |  | -    gpr_timespec start_time = gpr_now(GPR_CLOCK_REALTIME);
 | 
	
		
			
				|  |  | -    int nrecords = 0;
 | 
	
		
			
				|  |  | -    while (1) {
 | 
	
		
			
				|  |  | -      void* record = census_log_start_write(write_size);
 | 
	
		
			
				|  |  | -      if (record == NULL) {
 | 
	
		
			
				|  |  | -        break;
 | 
	
		
			
				|  |  | -      }
 | 
	
		
			
				|  |  | -      census_log_end_write(record, write_size);
 | 
	
		
			
				|  |  | -      nrecords++;
 | 
	
		
			
				|  |  | -    }
 | 
	
		
			
				|  |  | -    gpr_timespec write_time =
 | 
	
		
			
				|  |  | -        gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), start_time);
 | 
	
		
			
				|  |  | -    double write_time_micro =
 | 
	
		
			
				|  |  | -        (double)write_time.tv_sec * 1000000 + (double)write_time.tv_nsec / 1000;
 | 
	
		
			
				|  |  | -    census_log_shutdown();
 | 
	
		
			
				|  |  | -    printf(
 | 
	
		
			
				|  |  | -        "Wrote %d %d byte records in %.3g microseconds: %g records/us "
 | 
	
		
			
				|  |  | -        "(%g ns/record), %g gigabytes/s\n",
 | 
	
		
			
				|  |  | -        nrecords, (int)write_size, write_time_micro,
 | 
	
		
			
				|  |  | -        nrecords / write_time_micro, 1000 * write_time_micro / nrecords,
 | 
	
		
			
				|  |  | -        (double)((int)write_size * nrecords) / write_time_micro / 1000);
 | 
	
		
			
				|  |  | -  }
 | 
	
		
			
				|  |  | -}
 | 
	
		
			
				|  |  | -
 | 
	
		
			
				|  |  | -int main(int argc, char** argv) {
 | 
	
		
			
				|  |  | -  grpc_test_init(argc, argv);
 | 
	
		
			
				|  |  | -  gpr_time_init();
 | 
	
		
			
				|  |  | -  srand((unsigned)gpr_now(GPR_CLOCK_REALTIME).tv_nsec);
 | 
	
		
			
				|  |  | -  test_invalid_record_size();
 | 
	
		
			
				|  |  | -  test_end_write_with_different_size();
 | 
	
		
			
				|  |  | -  test_read_pending_record();
 | 
	
		
			
				|  |  | -  test_read_beyond_pending_record();
 | 
	
		
			
				|  |  | -  test_detached_while_reading();
 | 
	
		
			
				|  |  | -  test_fill_log_no_fragmentation();
 | 
	
		
			
				|  |  | -  test_fill_circular_log_no_fragmentation();
 | 
	
		
			
				|  |  | -  test_fill_log_with_straddling_records();
 | 
	
		
			
				|  |  | -  test_fill_circular_log_with_straddling_records();
 | 
	
		
			
				|  |  | -  test_small_log();
 | 
	
		
			
				|  |  | -  test_multiple_writers();
 | 
	
		
			
				|  |  | -  test_multiple_writers_circular_log();
 | 
	
		
			
				|  |  | -  test_performance();
 | 
	
		
			
				|  |  | -  return 0;
 | 
	
		
			
				|  |  | -}
 |