| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 | # Copyright 2019 gRPC authors.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at##     http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License."""An example of multiprocessing concurrency with gRPC."""from __future__ import absolute_importfrom __future__ import divisionfrom __future__ import print_functionimport argparseimport atexitimport loggingimport multiprocessingimport operatorimport sysimport grpcfrom examples.python.multiprocessing import prime_pb2from examples.python.multiprocessing import prime_pb2_grpc_PROCESS_COUNT = 8_MAXIMUM_CANDIDATE = 10000# Each worker process initializes a single channel after forking.# It's regrettable, but to ensure that each subprocess only has to instantiate# a single channel to be reused across all RPCs, we use globals._worker_channel_singleton = None_worker_stub_singleton = None_LOGGER = logging.getLogger(__name__)def _shutdown_worker():    _LOGGER.info('Shutting worker process down.')    if _worker_channel_singleton is not None:        _worker_channel_singleton.stop()def _initialize_worker(server_address):    global _worker_channel_singleton  # pylint: disable=global-statement    global _worker_stub_singleton  # pylint: disable=global-statement    _LOGGER.info('Initializing worker process.')    _worker_channel_singleton = grpc.insecure_channel(server_address)    _worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(        _worker_channel_singleton)    atexit.register(_shutdown_worker)def _run_worker_query(primality_candidate):    _LOGGER.info('Checking primality of %s.', primality_candidate)    return _worker_stub_singleton.check(        prime_pb2.PrimeCandidate(candidate=primality_candidate))def _calculate_primes(server_address):    worker_pool = multiprocessing.Pool(        processes=_PROCESS_COUNT,        initializer=_initialize_worker,        initargs=(server_address,))    check_range = range(2, _MAXIMUM_CANDIDATE)    primality = worker_pool.map(_run_worker_query, check_range)    primes = zip(check_range, map(operator.attrgetter('isPrime'), primality))    return tuple(primes)def main():    msg = 'Determine the primality of the first {} integers.'.format(        _MAXIMUM_CANDIDATE)    parser = argparse.ArgumentParser(description=msg)    parser.add_argument(        'server_address',        help='The address of the server (e.g. localhost:50051)')    args = parser.parse_args()    primes = _calculate_primes(args.server_address)    print(primes)if __name__ == '__main__':    handler = logging.StreamHandler(sys.stdout)    formatter = logging.Formatter('[PID %(process)d] %(message)s')    handler.setFormatter(formatter)    _LOGGER.addHandler(handler)    _LOGGER.setLevel(logging.INFO)    main()
 |