| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591 | /* * * Copyright 2015, Google Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are * met: * *     * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. *     * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following disclaimer * in the documentation and/or other materials provided with the * distribution. *     * Neither the name of Google Inc. nor the names of its * contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * */#include "src/core/ext/census/census_log.h"#include <grpc/support/cpu.h>#include <grpc/support/log.h>#include <grpc/support/port_platform.h>#include <grpc/support/sync.h>#include <grpc/support/thd.h>#include <grpc/support/time.h>#include <grpc/support/useful.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include "test/core/util/test_config.h"/* Fills in 'record' of size 'size'. Each byte in record is filled in with the   same value. The value is extracted from 'record' pointer. */static void write_record(char *record, size_t size) {  char data = (uintptr_t)record % 255;  memset(record, data, size);}/* Reads fixed size records. Returns the number of records read in   'num_records'. */static void read_records(size_t record_size, const char *buffer,                         size_t buffer_size, int32_t *num_records) {  int32_t ix;  GPR_ASSERT(buffer_size >= record_size);  GPR_ASSERT(buffer_size % record_size == 0);  *num_records = buffer_size / record_size;  for (ix = 0; ix < *num_records; ++ix) {    size_t jx;    const char *record = buffer + (record_size * ix);    char data = (uintptr_t)record % 255;    for (jx = 0; jx < record_size; ++jx) {      GPR_ASSERT(data == record[jx]);    }  }}/* Tries to write the specified number of records. Stops when the log gets   full. Returns the number of records written. Spins for random   number of times, up to 'max_spin_count', between writes. */static size_t write_records_to_log(int writer_id, int32_t record_size,                                   int32_t num_records,                                   int32_t max_spin_count) {  int32_t ix;  int counter = 0;  for (ix = 0; ix < num_records; ++ix) {    int32_t jx;    int32_t spin_count = max_spin_count ? rand() % max_spin_count : 0;    char *record;    if (counter++ == num_records / 10) {      printf("   Writer %d: %d out of %d written\n", writer_id, ix,             num_records);      counter = 0;    }    record = (char *)(census_log_start_write(record_size));    if (record == NULL) {      return ix;    }    write_record(record, record_size);    census_log_end_write(record, record_size);    for (jx = 0; jx < spin_count; ++jx) {      GPR_ASSERT(jx >= 0);    }  }  return num_records;}/* Performs a single read iteration. Returns the number of records read. */static size_t perform_read_iteration(size_t record_size) {  const void *read_buffer = NULL;  size_t bytes_available;  size_t records_read = 0;  census_log_init_reader();  while ((read_buffer = census_log_read_next(&bytes_available))) {    int32_t num_records = 0;    read_records(record_size, (const char *)read_buffer, bytes_available,                 &num_records);    records_read += num_records;  }  return records_read;}/* Asserts that the log is empty. */static void assert_log_empty(void) {  size_t bytes_available;  census_log_init_reader();  GPR_ASSERT(census_log_read_next(&bytes_available) == NULL);}/* Given log size and record size, computes the minimum usable space. */static int32_t min_usable_space(size_t log_size, size_t record_size) {  int32_t usable_space;  int32_t num_blocks =      GPR_MAX(log_size / CENSUS_LOG_MAX_RECORD_SIZE, gpr_cpu_num_cores());  int32_t waste_per_block = CENSUS_LOG_MAX_RECORD_SIZE % record_size;  /* In the worst case, all except one core-local block is full. */  int32_t num_full_blocks = num_blocks - 1;  usable_space = (int32_t)log_size -                 (num_full_blocks * CENSUS_LOG_MAX_RECORD_SIZE) -                 ((num_blocks - num_full_blocks) * waste_per_block);  GPR_ASSERT(usable_space > 0);  return usable_space;}/* Fills the log and verifies data. If 'no fragmentation' is true, records   are sized such that CENSUS_LOG_2_MAX_RECORD_SIZE is a multiple of record   size. If not a circular log, verifies that the number of records written   match the number of records read. */static void fill_log(size_t log_size, int no_fragmentation, int circular_log) {  int size;  int32_t records_written;  int32_t usable_space;  int32_t records_read;  if (no_fragmentation) {    int log2size = rand() % (CENSUS_LOG_2_MAX_RECORD_SIZE + 1);    size = (1 << log2size);  } else {    while (1) {      size = 1 + (rand() % CENSUS_LOG_MAX_RECORD_SIZE);      if (CENSUS_LOG_MAX_RECORD_SIZE % size) {        break;      }    }  }  printf("   Fill record size: %d\n", size);  records_written = write_records_to_log(      0 /* writer id */, size, (log_size / size) * 2, 0 /* spin count */);  usable_space = min_usable_space(log_size, size);  GPR_ASSERT(records_written * size >= usable_space);  records_read = perform_read_iteration(size);  if (!circular_log) {    GPR_ASSERT(records_written == records_read);  }  assert_log_empty();}/* Structure to pass args to writer_thread */typedef struct writer_thread_args {  /* Index of this thread in the writers vector. */  int index;  /* Record size. */  size_t record_size;  /* Number of records to write. */  int32_t num_records;  /* Used to signal when writer is complete */  gpr_cv *done;  gpr_mu *mu;  int *count;} writer_thread_args;/* Writes the given number of records of random size (up to kMaxRecordSize) and   random data to the specified log. */static void writer_thread(void *arg) {  writer_thread_args *args = (writer_thread_args *)arg;  /* Maximum number of times to spin between writes. */  static const int32_t MAX_SPIN_COUNT = 50;  int records_written = 0;  printf("   Writer: %d\n", args->index);  while (records_written < args->num_records) {    records_written += write_records_to_log(args->index, args->record_size,                                            args->num_records - records_written,                                            MAX_SPIN_COUNT);    if (records_written < args->num_records) {      /* Ran out of log space. Sleep for a bit and let the reader catch up.         This should never happen for circular logs. */      printf("   Writer stalled due to out-of-space: %d out of %d written\n",             records_written, args->num_records);      gpr_sleep_until(GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));    }  }  /* Done. Decrement count and signal. */  gpr_mu_lock(args->mu);  (*args->count)--;  gpr_cv_broadcast(args->done);  printf("   Writer done: %d\n", args->index);  gpr_mu_unlock(args->mu);}/* struct to pass args to reader_thread */typedef struct reader_thread_args {  /* Record size. */  size_t record_size;  /* Interval between read iterations. */  int32_t read_iteration_interval_in_msec;  /* Total number of records. */  int32_t total_records;  /* Signalled when reader should stop. */  gpr_cv stop;  int stop_flag;  /* Used to signal when reader has finished */  gpr_cv *done;  gpr_mu *mu;  int running;} reader_thread_args;/* Reads and verifies the specified number of records. Reader can also be   stopped via gpr_cv_signal(&args->stop). Sleeps for 'read_interval_in_msec'   between read iterations. */static void reader_thread(void *arg) {  int32_t records_read = 0;  reader_thread_args *args = (reader_thread_args *)arg;  int32_t num_iterations = 0;  gpr_timespec interval;  int counter = 0;  printf("   Reader starting\n");  interval = gpr_time_from_micros(      (int64_t)args->read_iteration_interval_in_msec * 1000, GPR_TIMESPAN);  gpr_mu_lock(args->mu);  while (!args->stop_flag && records_read < args->total_records) {    gpr_cv_wait(&args->stop, args->mu, interval);    if (!args->stop_flag) {      records_read += perform_read_iteration(args->record_size);      GPR_ASSERT(records_read <= args->total_records);      if (counter++ == 100000) {        printf("   Reader: %d out of %d read\n", records_read,               args->total_records);        counter = 0;      }      ++num_iterations;    }  }  /* Done */  args->running = 0;  gpr_cv_broadcast(args->done);  printf("   Reader: records: %d, iterations: %d\n", records_read,         num_iterations);  gpr_mu_unlock(args->mu);}/* Creates NUM_WRITERS writers where each writer writes NUM_RECORDS_PER_WRITER   records. Also, starts a reader that iterates over and reads blocks every   READ_ITERATION_INTERVAL_IN_MSEC. *//* Number of writers. */#define NUM_WRITERS 5static void multiple_writers_single_reader(int circular_log) {  /* Sleep interval between read iterations. */  static const int32_t READ_ITERATION_INTERVAL_IN_MSEC = 10;  /* Number of records written by each writer. */  static const int32_t NUM_RECORDS_PER_WRITER = 10 * 1024 * 1024;  /* Maximum record size. */  static const size_t MAX_RECORD_SIZE = 10;  int ix;  gpr_thd_id id;  gpr_cv writers_done;  int writers_count = NUM_WRITERS;  gpr_mu writers_mu; /* protects writers_done and writers_count */  writer_thread_args writers[NUM_WRITERS];  gpr_cv reader_done;  gpr_mu reader_mu; /* protects reader_done and reader.running */  reader_thread_args reader;  int32_t record_size = 1 + rand() % MAX_RECORD_SIZE;  printf("   Record size: %d\n", record_size);  /* Create and start writers. */  gpr_cv_init(&writers_done);  gpr_mu_init(&writers_mu);  for (ix = 0; ix < NUM_WRITERS; ++ix) {    writers[ix].index = ix;    writers[ix].record_size = record_size;    writers[ix].num_records = NUM_RECORDS_PER_WRITER;    writers[ix].done = &writers_done;    writers[ix].count = &writers_count;    writers[ix].mu = &writers_mu;    gpr_thd_new(&id, &writer_thread, &writers[ix], NULL);  }  /* Start reader. */  reader.record_size = record_size;  reader.read_iteration_interval_in_msec = READ_ITERATION_INTERVAL_IN_MSEC;  reader.total_records = NUM_WRITERS * NUM_RECORDS_PER_WRITER;  reader.stop_flag = 0;  gpr_cv_init(&reader.stop);  gpr_cv_init(&reader_done);  reader.done = &reader_done;  gpr_mu_init(&reader_mu);  reader.mu = &reader_mu;  reader.running = 1;  gpr_thd_new(&id, &reader_thread, &reader, NULL);  /* Wait for writers to finish. */  gpr_mu_lock(&writers_mu);  while (writers_count != 0) {    gpr_cv_wait(&writers_done, &writers_mu, gpr_inf_future(GPR_CLOCK_REALTIME));  }  gpr_mu_unlock(&writers_mu);  gpr_mu_destroy(&writers_mu);  gpr_cv_destroy(&writers_done);  gpr_mu_lock(&reader_mu);  if (circular_log) {    /* Stop reader. */    reader.stop_flag = 1;    gpr_cv_signal(&reader.stop);  }  /* wait for reader to finish */  while (reader.running) {    gpr_cv_wait(&reader_done, &reader_mu, gpr_inf_future(GPR_CLOCK_REALTIME));  }  if (circular_log) {    /* Assert that there were no out-of-space errors. */    GPR_ASSERT(0 == census_log_out_of_space_count());  }  gpr_mu_unlock(&reader_mu);  gpr_mu_destroy(&reader_mu);  gpr_cv_destroy(&reader_done);  printf("   Reader: finished\n");}/* Log sizes to use for all tests. */#define LOG_SIZE_IN_MB 1#define LOG_SIZE_IN_BYTES (LOG_SIZE_IN_MB << 20)static void setup_test(int circular_log) {  census_log_initialize(LOG_SIZE_IN_MB, circular_log);  GPR_ASSERT(census_log_remaining_space() == LOG_SIZE_IN_BYTES);}/* Attempts to create a record of invalid size (size >   CENSUS_LOG_MAX_RECORD_SIZE). */void test_invalid_record_size(void) {  static const size_t INVALID_SIZE = CENSUS_LOG_MAX_RECORD_SIZE + 1;  static const size_t VALID_SIZE = 1;  void *record;  printf("Starting test: invalid record size\n");  setup_test(0);  record = census_log_start_write(INVALID_SIZE);  GPR_ASSERT(record == NULL);  /* Now try writing a valid record. */  record = census_log_start_write(VALID_SIZE);  GPR_ASSERT(record != NULL);  census_log_end_write(record, VALID_SIZE);  /* Verifies that available space went down by one block. In theory, this     check can fail if the thread is context switched to a new CPU during the     start_write execution (multiple blocks get allocated), but this has not     been observed in practice. */  GPR_ASSERT(LOG_SIZE_IN_BYTES - CENSUS_LOG_MAX_RECORD_SIZE ==             census_log_remaining_space());  census_log_shutdown();}/* Tests end_write() with a different size than what was specified in   start_write(). */void test_end_write_with_different_size(void) {  static const size_t START_WRITE_SIZE = 10;  static const size_t END_WRITE_SIZE = 7;  void *record_written;  const void *record_read;  size_t bytes_available;  printf("Starting test: end write with different size\n");  setup_test(0);  record_written = census_log_start_write(START_WRITE_SIZE);  GPR_ASSERT(record_written != NULL);  census_log_end_write(record_written, END_WRITE_SIZE);  census_log_init_reader();  record_read = census_log_read_next(&bytes_available);  GPR_ASSERT(record_written == record_read);  GPR_ASSERT(END_WRITE_SIZE == bytes_available);  assert_log_empty();  census_log_shutdown();}/* Verifies that pending records are not available via read_next(). */void test_read_pending_record(void) {  static const size_t PR_RECORD_SIZE = 1024;  size_t bytes_available;  const void *record_read;  void *record_written;  printf("Starting test: read pending record\n");  setup_test(0);  /* Start a write. */  record_written = census_log_start_write(PR_RECORD_SIZE);  GPR_ASSERT(record_written != NULL);  /* As write is pending, read should fail. */  census_log_init_reader();  record_read = census_log_read_next(&bytes_available);  GPR_ASSERT(record_read == NULL);  /* A read followed by end_write() should succeed. */  census_log_end_write(record_written, PR_RECORD_SIZE);  census_log_init_reader();  record_read = census_log_read_next(&bytes_available);  GPR_ASSERT(record_written == record_read);  GPR_ASSERT(PR_RECORD_SIZE == bytes_available);  assert_log_empty();  census_log_shutdown();}/* Tries reading beyond pending write. */void test_read_beyond_pending_record(void) {  /* Start a write. */  uint32_t incomplete_record_size = 10;  uint32_t complete_record_size = 20;  size_t bytes_available;  void *complete_record;  const void *record_read;  void *incomplete_record;  printf("Starting test: read beyond pending record\n");  setup_test(0);  incomplete_record = census_log_start_write(incomplete_record_size);  GPR_ASSERT(incomplete_record != NULL);  complete_record = census_log_start_write(complete_record_size);  GPR_ASSERT(complete_record != NULL);  GPR_ASSERT(complete_record != incomplete_record);  census_log_end_write(complete_record, complete_record_size);  /* Now iterate over blocks to read completed records. */  census_log_init_reader();  record_read = census_log_read_next(&bytes_available);  GPR_ASSERT(complete_record == record_read);  GPR_ASSERT(complete_record_size == bytes_available);  /* Complete first record. */  census_log_end_write(incomplete_record, incomplete_record_size);  /* Have read past the incomplete record, so read_next() should return NULL. */  /* NB: this test also assumes our thread did not get switched to a different     CPU between the two start_write calls */  record_read = census_log_read_next(&bytes_available);  GPR_ASSERT(record_read == NULL);  /* Reset reader to get the newly completed record. */  census_log_init_reader();  record_read = census_log_read_next(&bytes_available);  GPR_ASSERT(incomplete_record == record_read);  GPR_ASSERT(incomplete_record_size == bytes_available);  assert_log_empty();  census_log_shutdown();}/* Tests scenario where block being read is detached from a core and put on the   dirty list. */void test_detached_while_reading(void) {  static const size_t DWR_RECORD_SIZE = 10;  size_t bytes_available;  const void *record_read;  void *record_written;  uint32_t block_read = 0;  printf("Starting test: detached while reading\n");  setup_test(0);  /* Start a write. */  record_written = census_log_start_write(DWR_RECORD_SIZE);  GPR_ASSERT(record_written != NULL);  census_log_end_write(record_written, DWR_RECORD_SIZE);  /* Read this record. */  census_log_init_reader();  record_read = census_log_read_next(&bytes_available);  GPR_ASSERT(record_read != NULL);  GPR_ASSERT(DWR_RECORD_SIZE == bytes_available);  /* Now fill the log. This will move the block being read from core-local     array to the dirty list. */  while ((record_written = census_log_start_write(DWR_RECORD_SIZE))) {    census_log_end_write(record_written, DWR_RECORD_SIZE);  }  /* In this iteration, read_next() should only traverse blocks in the     core-local array. Therefore, we expect at most gpr_cpu_num_cores() more     blocks. As log is full, if read_next() is traversing the dirty list, we     will get more than gpr_cpu_num_cores() blocks. */  while ((record_read = census_log_read_next(&bytes_available))) {    ++block_read;    GPR_ASSERT(block_read <= gpr_cpu_num_cores());  }  census_log_shutdown();}/* Fills non-circular log with records sized such that size is a multiple of   CENSUS_LOG_MAX_RECORD_SIZE (no per-block fragmentation). */void test_fill_log_no_fragmentation(void) {  const int circular = 0;  printf("Starting test: fill log no fragmentation\n");  setup_test(circular);  fill_log(LOG_SIZE_IN_BYTES, 1 /* no fragmentation */, circular);  census_log_shutdown();}/* Fills circular log with records sized such that size is a multiple of   CENSUS_LOG_MAX_RECORD_SIZE (no per-block fragmentation). */void test_fill_circular_log_no_fragmentation(void) {  const int circular = 1;  printf("Starting test: fill circular log no fragmentation\n");  setup_test(circular);  fill_log(LOG_SIZE_IN_BYTES, 1 /* no fragmentation */, circular);  census_log_shutdown();}/* Fills non-circular log with records that may straddle end of a block. */void test_fill_log_with_straddling_records(void) {  const int circular = 0;  printf("Starting test: fill log with straddling records\n");  setup_test(circular);  fill_log(LOG_SIZE_IN_BYTES, 0 /* block straddling records */, circular);  census_log_shutdown();}/* Fills circular log with records that may straddle end of a block. */void test_fill_circular_log_with_straddling_records(void) {  const int circular = 1;  printf("Starting test: fill circular log with straddling records\n");  setup_test(circular);  fill_log(LOG_SIZE_IN_BYTES, 0 /* block straddling records */, circular);  census_log_shutdown();}/* Tests scenario where multiple writers and a single reader are using a log   that is configured to discard old records. */void test_multiple_writers_circular_log(void) {  const int circular = 1;  printf("Starting test: multiple writers circular log\n");  setup_test(circular);  multiple_writers_single_reader(circular);  census_log_shutdown();}/* Tests scenario where multiple writers and a single reader are using a log   that is configured to discard old records. */void test_multiple_writers(void) {  const int circular = 0;  printf("Starting test: multiple writers\n");  setup_test(circular);  multiple_writers_single_reader(circular);  census_log_shutdown();}/* Repeat the straddling records and multiple writers tests with a small log. */void test_small_log(void) {  size_t log_size;  const int circular = 0;  printf("Starting test: small log\n");  census_log_initialize(0, circular);  log_size = census_log_remaining_space();  GPR_ASSERT(log_size > 0);  fill_log(log_size, 0, circular);  census_log_shutdown();  census_log_initialize(0, circular);  multiple_writers_single_reader(circular);  census_log_shutdown();}void test_performance(void) {  int write_size = 1;  for (; write_size < CENSUS_LOG_MAX_RECORD_SIZE; write_size *= 2) {    gpr_timespec write_time;    gpr_timespec start_time;    double write_time_micro = 0.0;    int nrecords = 0;    setup_test(0);    start_time = gpr_now(GPR_CLOCK_REALTIME);    while (1) {      void *record = census_log_start_write(write_size);      if (record == NULL) {        break;      }      census_log_end_write(record, write_size);      nrecords++;    }    write_time = gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), start_time);    write_time_micro = write_time.tv_sec * 1000000 + write_time.tv_nsec / 1000;    census_log_shutdown();    printf(        "Wrote %d %d byte records in %.3g microseconds: %g records/us "        "(%g ns/record), %g gigabytes/s\n",        nrecords, write_size, write_time_micro, nrecords / write_time_micro,        1000 * write_time_micro / nrecords,        (write_size * nrecords) / write_time_micro / 1000);  }}
 |