Jelajahi Sumber

Fix infinite streams in qps_test

Craig Tiller 8 tahun lalu
induk
melakukan
9354720625
1 mengubah file dengan 16 tambahan dan 8 penghapusan
  1. 16 8
      test/cpp/qps/client_sync.cc

+ 16 - 8
test/cpp/qps/client_sync.cc

@@ -153,16 +153,22 @@ class SynchronousStreamingClient final : public SynchronousClient {
     StartThreads(num_threads_);
     StartThreads(num_threads_);
   }
   }
   ~SynchronousStreamingClient() {
   ~SynchronousStreamingClient() {
+    std::vector<std::thread> cleanup_threads;
     for (size_t i = 0; i < num_threads_; i++) {
     for (size_t i = 0; i < num_threads_; i++) {
-      auto stream = &stream_[i];
-      if (*stream) {
-        (*stream)->WritesDone();
-        Status s = (*stream)->Finish();
-        if (!s.ok()) {
-          gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", i,
-                  s.error_message().c_str());
+      cleanup_threads.emplace_back([this, i]() {
+        auto stream = &stream_[i];
+        if (*stream) {
+          (*stream)->WritesDone();
+          Status s = (*stream)->Finish();
+          if (!s.ok()) {
+            gpr_log(GPR_ERROR, "Stream %" PRIuPTR " received an error %s", i,
+                    s.error_message().c_str());
+          }
         }
         }
-      }
+      });
+    }
+    for (size_t i=0; i<num_threads_; i++) {
+      cleanup_threads[i].join();
     }
     }
   }
   }
 
 
@@ -179,6 +185,8 @@ class SynchronousStreamingClient final : public SynchronousClient {
       if ((messages_per_stream_ != 0) &&
       if ((messages_per_stream_ != 0) &&
           (++messages_issued_[thread_idx] < messages_per_stream_)) {
           (++messages_issued_[thread_idx] < messages_per_stream_)) {
         return true;
         return true;
+      } else if (messages_per_stream_ == 0) {
+        return true;
       } else {
       } else {
         // Fall through to the below resetting code after finish
         // Fall through to the below resetting code after finish
       }
       }