stress_client.rb 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. #!/usr/bin/env ruby
  2. # Copyright 2016 gRPC authors.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. require 'optparse'
  16. require 'thread'
  17. require_relative '../pb/test/client'
  18. require_relative './metrics_server'
  19. require_relative '../lib/grpc'
  20. class QpsGauge < Gauge
  21. @query_count
  22. @query_mutex
  23. @start_time
  24. def initialize
  25. @query_count = 0
  26. @query_mutex = Mutex.new
  27. @start_time = Time.now
  28. end
  29. def increment_queries
  30. @query_mutex.synchronize { @query_count += 1}
  31. end
  32. def get_name
  33. 'qps'
  34. end
  35. def get_type
  36. 'long'
  37. end
  38. def get_value
  39. (@query_mutex.synchronize { @query_count / (Time.now - @start_time) }).to_i
  40. end
  41. end
  42. def start_metrics_server(port)
  43. host = "0.0.0.0:#{port}"
  44. server = GRPC::RpcServer.new
  45. server.add_http2_port(host, :this_port_is_insecure)
  46. service = MetricsServiceImpl.new
  47. server.handle(service)
  48. server_thread = Thread.new { server.run_till_terminated }
  49. [server, service, server_thread]
  50. end
  51. StressArgs = Struct.new(:server_addresses, :test_cases, :duration,
  52. :channels_per_server, :concurrent_calls, :metrics_port)
  53. def start(stress_args)
  54. running = true
  55. threads = []
  56. qps_gauge = QpsGauge.new
  57. metrics_server, metrics_service, metrics_thread =
  58. start_metrics_server(stress_args.metrics_port)
  59. metrics_service.register_gauge(qps_gauge)
  60. stress_args.server_addresses.each do |address|
  61. stress_args.channels_per_server.times do
  62. client_args = Args.new
  63. client_args.host, client_args.port = address.split(':')
  64. client_args.secure = false
  65. client_args.test_case = ''
  66. stub = create_stub(client_args)
  67. named_tests = NamedTests.new(stub, client_args)
  68. stress_args.concurrent_calls.times do
  69. threads << Thread.new do
  70. while running
  71. named_tests.method(stress_args.test_cases.sample).call
  72. qps_gauge.increment_queries
  73. end
  74. end
  75. end
  76. end
  77. end
  78. if stress_args.duration >= 0
  79. sleep stress_args.duration
  80. running = false
  81. metrics_server.stop
  82. p "QPS: #{qps_gauge.get_value}"
  83. threads.each { |thd| thd.join; }
  84. end
  85. metrics_thread.join
  86. end
  87. def parse_stress_args
  88. stress_args = StressArgs.new
  89. stress_args.server_addresses = ['localhost:8080']
  90. stress_args.test_cases = []
  91. stress_args.duration = -1
  92. stress_args.channels_per_server = 1
  93. stress_args.concurrent_calls = 1
  94. stress_args.metrics_port = '8081'
  95. OptionParser.new do |opts|
  96. opts.on('--server_addresses [LIST]', Array) do |addrs|
  97. stress_args.server_addresses = addrs
  98. end
  99. opts.on('--test_cases cases', Array) do |cases|
  100. stress_args.test_cases = (cases.map do |item|
  101. split = item.split(':')
  102. [split[0]] * split[1].to_i
  103. end).reduce([], :+)
  104. end
  105. opts.on('--test_duration_secs [INT]', OptionParser::DecimalInteger) do |time|
  106. stress_args.duration = time
  107. end
  108. opts.on('--num_channels_per_server [INT]', OptionParser::DecimalInteger) do |channels|
  109. stress_args.channels_per_server = channels
  110. end
  111. opts.on('--num_stubs_per_channel [INT]', OptionParser::DecimalInteger) do |stubs|
  112. stress_args.concurrent_calls = stubs
  113. end
  114. opts.on('--metrics_port [port]') do |port|
  115. stress_args.metrics_port = port
  116. end
  117. end.parse!
  118. stress_args
  119. end
  120. def main
  121. opts = parse_stress_args
  122. start(opts)
  123. end
  124. if __FILE__ == $0
  125. main
  126. end