|  | @@ -33,13 +33,14 @@ class LoadBalancerStatsService
 | 
	
		
			
				|  |  |      function getClientStats(\Grpc\Testing\LoadBalancerStatsRequest $request) {
 | 
	
		
			
				|  |  |          $num_rpcs = $request->getNumRpcs();
 | 
	
		
			
				|  |  |          $timeout_sec = $request->getTimeoutSec();
 | 
	
		
			
				|  |  | +        $rpcs_by_method = [];
 | 
	
		
			
				|  |  |          $rpcs_by_peer = [];
 | 
	
		
			
				|  |  | -        $num_failures = $num_rpcs;
 | 
	
		
			
				|  |  | +        $num_failures = 0;
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          // Heavy limitation now: the server is blocking, until all
 | 
	
		
			
				|  |  |          // the necessary num_rpcs are finished, or timeout is reached
 | 
	
		
			
				|  |  |          global $client_thread;
 | 
	
		
			
				|  |  | -        $start_id = count($client_thread->results) + 1;
 | 
	
		
			
				|  |  | +        $start_id = $client_thread->num_results + 1;
 | 
	
		
			
				|  |  |          $end_id = $start_id + $num_rpcs;
 | 
	
		
			
				|  |  |          $now = hrtime(true);
 | 
	
		
			
				|  |  |          $timeout = $now[0] + ($now[1] / 1e9) + $timeout_sec;
 | 
	
	
		
			
				|  | @@ -50,7 +51,7 @@ class LoadBalancerStatsService
 | 
	
		
			
				|  |  |                  break;
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |              // Thread variable seems to be read-only
 | 
	
		
			
				|  |  | -            $curr_id = count($client_thread->results);
 | 
	
		
			
				|  |  | +            $curr_id = $client_thread->num_results;
 | 
	
		
			
				|  |  |              if ($curr_id >= $end_id) {
 | 
	
		
			
				|  |  |                  break;
 | 
	
		
			
				|  |  |              }
 | 
	
	
		
			
				|  | @@ -58,19 +59,52 @@ class LoadBalancerStatsService
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          // Tally up results
 | 
	
		
			
				|  |  | -        $end_id = min($end_id, count($client_thread->results));
 | 
	
		
			
				|  |  | -        for ($i = $start_id; $i < $end_id; $i++) {
 | 
	
		
			
				|  |  | -            $hostname = $client_thread->results[$i];
 | 
	
		
			
				|  |  | -            if ($hostname) {
 | 
	
		
			
				|  |  | -                $num_failures -= 1;
 | 
	
		
			
				|  |  | -                if (!array_key_exists($hostname, $rpcs_by_peer)) {
 | 
	
		
			
				|  |  | -                    $rpcs_by_peer[$hostname] = 0;
 | 
	
		
			
				|  |  | +        $end_id = min($end_id, $client_thread->num_results);
 | 
	
		
			
				|  |  | +        // "$client_thread->results" will be in the form of
 | 
	
		
			
				|  |  | +        // [
 | 
	
		
			
				|  |  | +        //   'rpc1' => [
 | 
	
		
			
				|  |  | +        //     'hostname1', '', 'hostname2', 'hostname1', '', ...
 | 
	
		
			
				|  |  | +        //   ],
 | 
	
		
			
				|  |  | +        //   'rpc2' => [
 | 
	
		
			
				|  |  | +        //     '', 'hostname1', 'hostname2', '', 'hostname2', ...
 | 
	
		
			
				|  |  | +        //   ],
 | 
	
		
			
				|  |  | +        // ]
 | 
	
		
			
				|  |  | +        foreach ($client_thread->results as $rpc => $results) {
 | 
	
		
			
				|  |  | +            // initialize, can always start from scratch here
 | 
	
		
			
				|  |  | +            $rpcs_by_method[$rpc] = [];
 | 
	
		
			
				|  |  | +            for ($i = $start_id; $i < $end_id; $i++) {
 | 
	
		
			
				|  |  | +                $hostname = $results[$i];
 | 
	
		
			
				|  |  | +                if ($hostname) {
 | 
	
		
			
				|  |  | +                    // initialize in case we haven't seen this hostname
 | 
	
		
			
				|  |  | +                    // before
 | 
	
		
			
				|  |  | +                    if (!array_key_exists($hostname, $rpcs_by_method[$rpc])) {
 | 
	
		
			
				|  |  | +                        $rpcs_by_method[$rpc][$hostname] = 0;
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                    if (!array_key_exists($hostname, $rpcs_by_peer)) {
 | 
	
		
			
				|  |  | +                        $rpcs_by_peer[$hostname] = 0;
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                    // increment the remote hostname distribution histogram
 | 
	
		
			
				|  |  | +                    // both by overall, and broken down per RPC
 | 
	
		
			
				|  |  | +                    $rpcs_by_method[$rpc][$hostname] += 1;
 | 
	
		
			
				|  |  | +                    $rpcs_by_peer[$hostname] += 1;
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    // $num_failures here are counted per individual RPC
 | 
	
		
			
				|  |  | +                    $num_failures += 1;
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | -                $rpcs_by_peer[$hostname] += 1;
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // Convert our hashmaps above into protobuf objects
 | 
	
		
			
				|  |  |          $response = new Grpc\Testing\LoadBalancerStatsResponse();
 | 
	
		
			
				|  |  | +        $rpcs_by_method_map = [];
 | 
	
		
			
				|  |  | +        foreach ($rpcs_by_method as $rpc => $rpcs_by_peer_per_method) {
 | 
	
		
			
				|  |  | +            $rpcs_by_peer_proto_obj
 | 
	
		
			
				|  |  | +                = new Grpc\Testing\LoadBalancerStatsResponse\RpcsByPeer();
 | 
	
		
			
				|  |  | +            $rpcs_by_peer_proto_obj->setRpcsByPeer($rpcs_by_peer_per_method);
 | 
	
		
			
				|  |  | +            $rpcs_by_method_map[$rpc] = $rpcs_by_peer_proto_obj;
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |          $response->setRpcsByPeer($rpcs_by_peer);
 | 
	
		
			
				|  |  | +        $response->setRpcsByMethod($rpcs_by_method_map);
 | 
	
		
			
				|  |  |          $response->setNumFailures($num_failures);
 | 
	
		
			
				|  |  |          return $response;
 | 
	
		
			
				|  |  |      }
 | 
	
	
		
			
				|  | @@ -83,28 +117,74 @@ class ClientThread extends Thread {
 | 
	
		
			
				|  |  |      private $target_seconds_between_rpcs_;
 | 
	
		
			
				|  |  |      private $fail_on_failed_rpcs_;
 | 
	
		
			
				|  |  |      private $autoload_path_;
 | 
	
		
			
				|  |  | +    private $TIMEOUT_US = 30 * 1e6; // 30 seconds
 | 
	
		
			
				|  |  | +    public $num_results = 0;
 | 
	
		
			
				|  |  |      public $results;
 | 
	
		
			
				|  |  | -    
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |      public function __construct($server_address, $qps, $fail_on_failed_rpcs,
 | 
	
		
			
				|  |  | +                                $rpcs_to_send, $metadata_to_send,
 | 
	
		
			
				|  |  |                                  $autoload_path) {
 | 
	
		
			
				|  |  |          $this->server_address_ = $server_address;
 | 
	
		
			
				|  |  |          $this->target_seconds_between_rpcs_ = 1.0 / $qps;
 | 
	
		
			
				|  |  |          $this->fail_on_failed_rpcs_ = $fail_on_failed_rpcs;
 | 
	
		
			
				|  |  | +        $this->rpcs_to_send = explode(',', $rpcs_to_send);
 | 
	
		
			
				|  |  | +        // Convert input in the form of
 | 
	
		
			
				|  |  | +        //   rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3
 | 
	
		
			
				|  |  | +        // into
 | 
	
		
			
				|  |  | +        //   [
 | 
	
		
			
				|  |  | +        //     'rpc1' => [
 | 
	
		
			
				|  |  | +        //       'k1' => 'v1',
 | 
	
		
			
				|  |  | +        //       'k3' => 'v3',
 | 
	
		
			
				|  |  | +        //     ],
 | 
	
		
			
				|  |  | +        //     'rpc2' => [
 | 
	
		
			
				|  |  | +        //       'k2' => 'v2'
 | 
	
		
			
				|  |  | +        //     ],
 | 
	
		
			
				|  |  | +        //   ]
 | 
	
		
			
				|  |  | +        $this->metadata_to_send = [];
 | 
	
		
			
				|  |  | +        if ($_all_metadata = explode(',', $metadata_to_send)) {
 | 
	
		
			
				|  |  | +            foreach ($_all_metadata as $one_metadata_pair) {
 | 
	
		
			
				|  |  | +                list($rpc,
 | 
	
		
			
				|  |  | +                     $metadata_key,
 | 
	
		
			
				|  |  | +                     $metadata_value) = explode(':', $one_metadata_pair);
 | 
	
		
			
				|  |  | +                // initialize in case we haven't seen this rpc before
 | 
	
		
			
				|  |  | +                if (!array_key_exists($rpc, $this->metadata_to_send)) {
 | 
	
		
			
				|  |  | +                    $this->metadata_to_send[$rpc] = [];
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                $this->metadata_to_send[$rpc][$metadata_key]
 | 
	
		
			
				|  |  | +                    = $metadata_value;
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  |          $this->autoload_path_ = $autoload_path;
 | 
	
		
			
				|  |  | +        $this->simple_request = new Grpc\Testing\SimpleRequest();
 | 
	
		
			
				|  |  | +        $this->empty_request = new Grpc\Testing\EmptyMessage();
 | 
	
		
			
				|  |  |          $this->results = [];
 | 
	
		
			
				|  |  | +        foreach ($this->rpcs_to_send as $rpc) {
 | 
	
		
			
				|  |  | +            $this->results[$rpc] = [];
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    public function sendUnaryCall($stub, $metadata) {
 | 
	
		
			
				|  |  | +        return $stub->UnaryCall($this->simple_request,
 | 
	
		
			
				|  |  | +                                $metadata,
 | 
	
		
			
				|  |  | +                                ['timeout' => $this->TIMEOUT_US]);
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    public function sendEmptyCall($stub, $metadata) {
 | 
	
		
			
				|  |  | +        return $stub->EmptyCall($this->empty_request,
 | 
	
		
			
				|  |  | +                                $metadata,
 | 
	
		
			
				|  |  | +                                ['timeout' => $this->TIMEOUT_US]);
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      public function run() {
 | 
	
		
			
				|  |  |          // Autoloaded classes do not get inherited in threads.
 | 
	
		
			
				|  |  |          // Hence we need to do this.
 | 
	
		
			
				|  |  |          require_once($this->autoload_path_);
 | 
	
		
			
				|  |  | -        $TIMEOUT_US = 30 * 1e6; // 30 seconds
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |          $stub = new Grpc\Testing\TestServiceClient($this->server_address_, [
 | 
	
		
			
				|  |  |              'credentials' => Grpc\ChannelCredentials::createInsecure()
 | 
	
		
			
				|  |  |          ]);
 | 
	
		
			
				|  |  | -        $request = new Grpc\Testing\SimpleRequest();
 | 
	
		
			
				|  |  | -        $target_next_start_us = hrtime(true) / 1000; # hrtime returns nanoseconds
 | 
	
		
			
				|  |  | +        # hrtime returns nanoseconds
 | 
	
		
			
				|  |  | +        $target_next_start_us = hrtime(true) / 1000;
 | 
	
		
			
				|  |  |          while (true) {
 | 
	
		
			
				|  |  |              $now_us = hrtime(true) / 1000;
 | 
	
		
			
				|  |  |              $sleep_us = $target_next_start_us - $now_us;
 | 
	
	
		
			
				|  | @@ -121,18 +201,43 @@ class ClientThread extends Thread {
 | 
	
		
			
				|  |  |                          ($this->target_seconds_between_rpcs_ * 1e6);
 | 
	
		
			
				|  |  |                  usleep($sleep_us);
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | -            list($response, $status)
 | 
	
		
			
				|  |  | -                = $stub->UnaryCall($request, [],
 | 
	
		
			
				|  |  | -                                   ['timeout' => $TIMEOUT_US])->wait();
 | 
	
		
			
				|  |  | -            if ($status->code == Grpc\STATUS_OK) {
 | 
	
		
			
				|  |  | -                $this->results[] = $response->getHostname();
 | 
	
		
			
				|  |  | -            } else {
 | 
	
		
			
				|  |  | -                if ($this->fail_on_failed_rpcs_) {
 | 
	
		
			
				|  |  | -                    throw new Exception('UnaryCall failed with status '
 | 
	
		
			
				|  |  | -                                        . $status->code);
 | 
	
		
			
				|  |  | +            foreach ($this->rpcs_to_send as $rpc) {
 | 
	
		
			
				|  |  | +                $metadata = array_key_exists(
 | 
	
		
			
				|  |  | +                    $rpc, $this->metadata_to_send) ?
 | 
	
		
			
				|  |  | +                          $this->metadata_to_send[$rpc] : [];
 | 
	
		
			
				|  |  | +                // This copy is somehow necessary because
 | 
	
		
			
				|  |  | +                // $this->metadata_to_send[$rpc] somehow becomes a
 | 
	
		
			
				|  |  | +                // Volatile object, instead of an associative array.
 | 
	
		
			
				|  |  | +                $metadata_array = [];
 | 
	
		
			
				|  |  | +                foreach ($metadata as $key => $value) {
 | 
	
		
			
				|  |  | +                    $metadata_array[$key] = [$value];
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                $call = null;
 | 
	
		
			
				|  |  | +                if ($rpc == 'UnaryCall') {
 | 
	
		
			
				|  |  | +                    $call = $this->sendUnaryCall($stub, $metadata_array);
 | 
	
		
			
				|  |  | +                } else if ($rpc == 'EmptyCall') {
 | 
	
		
			
				|  |  | +                    $call = $this->sendEmptyCall($stub, $metadata_array);
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    throw new Exception("Unhandled rpc $rpc");
 | 
	
		
			
				|  |  | +                }
 | 
	
		
			
				|  |  | +                // the remote peer is being returned as part of the
 | 
	
		
			
				|  |  | +                // initial metadata, according to the test spec
 | 
	
		
			
				|  |  | +                $initial_metadata = $call->getMetadata();
 | 
	
		
			
				|  |  | +                list($response, $status) = $call->wait();
 | 
	
		
			
				|  |  | +                if ($status->code == Grpc\STATUS_OK &&
 | 
	
		
			
				|  |  | +                    array_key_exists('hostname', $initial_metadata)) {
 | 
	
		
			
				|  |  | +                    $this->results[$rpc][] = $initial_metadata['hostname'][0];
 | 
	
		
			
				|  |  | +                } else {
 | 
	
		
			
				|  |  | +                    if ($this->fail_on_failed_rpcs_) {
 | 
	
		
			
				|  |  | +                        throw new Exception("$rpc failed with status "
 | 
	
		
			
				|  |  | +                                            . $status->code);
 | 
	
		
			
				|  |  | +                    }
 | 
	
		
			
				|  |  | +                    $this->results[$rpc][] = "";
 | 
	
		
			
				|  |  |                  }
 | 
	
		
			
				|  |  | -                $this->results[] = "";
 | 
	
		
			
				|  |  |              }
 | 
	
		
			
				|  |  | +            // $num_results here is only incremented when the group of
 | 
	
		
			
				|  |  | +            // all $rpcs_to_send are done.
 | 
	
		
			
				|  |  | +            $this->num_results++;
 | 
	
		
			
				|  |  |          }
 | 
	
		
			
				|  |  |      }
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -145,10 +250,14 @@ class ClientThread extends Thread {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  // Note: num_channels are currently ignored for now
 | 
	
		
			
				|  |  |  $args = getopt('', ['fail_on_failed_rpcs:', 'num_channels:',
 | 
	
		
			
				|  |  | +                    'rpc:', 'metadata:',
 | 
	
		
			
				|  |  |                      'server:', 'stats_port:', 'qps:']);
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  $client_thread = new ClientThread($args['server'], $args['qps'],
 | 
	
		
			
				|  |  |                                    $args['fail_on_failed_rpcs'],
 | 
	
		
			
				|  |  | +                                  (empty($args['rpc']) ? 'UnaryCall'
 | 
	
		
			
				|  |  | +                                   : $args['rpc']),
 | 
	
		
			
				|  |  | +                                  $args['metadata'],
 | 
	
		
			
				|  |  |                                    $autoload_path);
 | 
	
		
			
				|  |  |  $client_thread->start();
 | 
	
		
			
				|  |  |  
 |