|  | @@ -29,6 +29,8 @@
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  """Tests for the old '_low'."""
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +import Queue
 | 
	
		
			
				|  |  | +import threading
 | 
	
		
			
				|  |  |  import time
 | 
	
		
			
				|  |  |  import unittest
 | 
	
		
			
				|  |  |  
 | 
	
	
		
			
				|  | @@ -43,6 +45,7 @@ _BYTE_SEQUENCE_SEQUENCE = tuple(
 | 
	
		
			
				|  |  |      bytes(bytearray((row + column) % 256 for column in range(row)))
 | 
	
		
			
				|  |  |      for row in range(_STREAM_LENGTH))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  class LonelyClientTest(unittest.TestCase):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    def testLonelyClient(self):
 | 
	
	
		
			
				|  | @@ -79,6 +82,14 @@ class LonelyClientTest(unittest.TestCase):
 | 
	
		
			
				|  |  |      del completion_queue
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +def _drive_completion_queue(completion_queue, event_queue):
 | 
	
		
			
				|  |  | +  while True:
 | 
	
		
			
				|  |  | +    event = completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +    if event.kind is _low.Event.Kind.STOP:
 | 
	
		
			
				|  |  | +      break
 | 
	
		
			
				|  |  | +    event_queue.put(event)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  class EchoTest(unittest.TestCase):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    def setUp(self):
 | 
	
	
		
			
				|  | @@ -88,24 +99,26 @@ class EchoTest(unittest.TestCase):
 | 
	
		
			
				|  |  |      self.server = _low.Server(self.server_completion_queue)
 | 
	
		
			
				|  |  |      port = self.server.add_http2_addr('[::]:0')
 | 
	
		
			
				|  |  |      self.server.start()
 | 
	
		
			
				|  |  | +    self.server_events = Queue.Queue()
 | 
	
		
			
				|  |  | +    self.server_completion_queue_thread = threading.Thread(
 | 
	
		
			
				|  |  | +        target=_drive_completion_queue,
 | 
	
		
			
				|  |  | +        args=(self.server_completion_queue, self.server_events))
 | 
	
		
			
				|  |  | +    self.server_completion_queue_thread.start()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      self.client_completion_queue = _low.CompletionQueue()
 | 
	
		
			
				|  |  |      self.channel = _low.Channel('%s:%d' % (self.host, port), None)
 | 
	
		
			
				|  |  | +    self.client_events = Queue.Queue()
 | 
	
		
			
				|  |  | +    self.client_completion_queue_thread = threading.Thread(
 | 
	
		
			
				|  |  | +        target=_drive_completion_queue,
 | 
	
		
			
				|  |  | +        args=(self.client_completion_queue, self.client_events))
 | 
	
		
			
				|  |  | +    self.client_completion_queue_thread.start()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    def tearDown(self):
 | 
	
		
			
				|  |  |      self.server.stop()
 | 
	
		
			
				|  |  |      self.server_completion_queue.stop()
 | 
	
		
			
				|  |  |      self.client_completion_queue.stop()
 | 
	
		
			
				|  |  | -    while True:
 | 
	
		
			
				|  |  | -      event = self.server_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | -      if event is not None and event.kind is _low.Event.Kind.STOP:
 | 
	
		
			
				|  |  | -        break
 | 
	
		
			
				|  |  | -    while True:
 | 
	
		
			
				|  |  | -      event = self.client_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | -      if event is not None and event.kind is _low.Event.Kind.STOP:
 | 
	
		
			
				|  |  | -        break
 | 
	
		
			
				|  |  | -    self.server_completion_queue = None
 | 
	
		
			
				|  |  | -    self.client_completion_queue = None
 | 
	
		
			
				|  |  | +    self.server_completion_queue_thread.join()
 | 
	
		
			
				|  |  | +    self.client_completion_queue_thread.join()
 | 
	
		
			
				|  |  |      del self.server
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    def _perform_echo_test(self, test_data):
 | 
	
	
		
			
				|  | @@ -144,7 +157,7 @@ class EchoTest(unittest.TestCase):
 | 
	
		
			
				|  |  |      client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      self.server.service(service_tag)
 | 
	
		
			
				|  |  | -    service_accepted = self.server_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +    service_accepted = self.server_events.get()
 | 
	
		
			
				|  |  |      self.assertIsNotNone(service_accepted)
 | 
	
		
			
				|  |  |      self.assertIs(service_accepted.kind, _low.Event.Kind.SERVICE_ACCEPTED)
 | 
	
		
			
				|  |  |      self.assertIs(service_accepted.tag, service_tag)
 | 
	
	
		
			
				|  | @@ -165,7 +178,7 @@ class EchoTest(unittest.TestCase):
 | 
	
		
			
				|  |  |                               server_leading_binary_metadata_value)
 | 
	
		
			
				|  |  |      server_call.premetadata()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    metadata_accepted = self.client_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +    metadata_accepted = self.client_events.get()
 | 
	
		
			
				|  |  |      self.assertIsNotNone(metadata_accepted)
 | 
	
		
			
				|  |  |      self.assertEqual(_low.Event.Kind.METADATA_ACCEPTED, metadata_accepted.kind)
 | 
	
		
			
				|  |  |      self.assertEqual(metadata_tag, metadata_accepted.tag)
 | 
	
	
		
			
				|  | @@ -179,14 +192,14 @@ class EchoTest(unittest.TestCase):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      for datum in test_data:
 | 
	
		
			
				|  |  |        client_call.write(datum, write_tag)
 | 
	
		
			
				|  |  | -      write_accepted = self.client_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +      write_accepted = self.client_events.get()
 | 
	
		
			
				|  |  |        self.assertIsNotNone(write_accepted)
 | 
	
		
			
				|  |  |        self.assertIs(write_accepted.kind, _low.Event.Kind.WRITE_ACCEPTED)
 | 
	
		
			
				|  |  |        self.assertIs(write_accepted.tag, write_tag)
 | 
	
		
			
				|  |  |        self.assertIs(write_accepted.write_accepted, True)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |        server_call.read(read_tag)
 | 
	
		
			
				|  |  | -      read_accepted = self.server_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +      read_accepted = self.server_events.get()
 | 
	
		
			
				|  |  |        self.assertIsNotNone(read_accepted)
 | 
	
		
			
				|  |  |        self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
 | 
	
		
			
				|  |  |        self.assertEqual(read_tag, read_accepted.tag)
 | 
	
	
		
			
				|  | @@ -194,14 +207,14 @@ class EchoTest(unittest.TestCase):
 | 
	
		
			
				|  |  |        server_data.append(read_accepted.bytes)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |        server_call.write(read_accepted.bytes, write_tag)
 | 
	
		
			
				|  |  | -      write_accepted = self.server_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +      write_accepted = self.server_events.get()
 | 
	
		
			
				|  |  |        self.assertIsNotNone(write_accepted)
 | 
	
		
			
				|  |  |        self.assertEqual(_low.Event.Kind.WRITE_ACCEPTED, write_accepted.kind)
 | 
	
		
			
				|  |  |        self.assertEqual(write_tag, write_accepted.tag)
 | 
	
		
			
				|  |  |        self.assertTrue(write_accepted.write_accepted)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |        client_call.read(read_tag)
 | 
	
		
			
				|  |  | -      read_accepted = self.client_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +      read_accepted = self.client_events.get()
 | 
	
		
			
				|  |  |        self.assertIsNotNone(read_accepted)
 | 
	
		
			
				|  |  |        self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
 | 
	
		
			
				|  |  |        self.assertEqual(read_tag, read_accepted.tag)
 | 
	
	
		
			
				|  | @@ -209,14 +222,14 @@ class EchoTest(unittest.TestCase):
 | 
	
		
			
				|  |  |        client_data.append(read_accepted.bytes)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      client_call.complete(complete_tag)
 | 
	
		
			
				|  |  | -    complete_accepted = self.client_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +    complete_accepted = self.client_events.get()
 | 
	
		
			
				|  |  |      self.assertIsNotNone(complete_accepted)
 | 
	
		
			
				|  |  |      self.assertIs(complete_accepted.kind, _low.Event.Kind.COMPLETE_ACCEPTED)
 | 
	
		
			
				|  |  |      self.assertIs(complete_accepted.tag, complete_tag)
 | 
	
		
			
				|  |  |      self.assertIs(complete_accepted.complete_accepted, True)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      server_call.read(read_tag)
 | 
	
		
			
				|  |  | -    read_accepted = self.server_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +    read_accepted = self.server_events.get()
 | 
	
		
			
				|  |  |      self.assertIsNotNone(read_accepted)
 | 
	
		
			
				|  |  |      self.assertEqual(_low.Event.Kind.READ_ACCEPTED, read_accepted.kind)
 | 
	
		
			
				|  |  |      self.assertEqual(read_tag, read_accepted.tag)
 | 
	
	
		
			
				|  | @@ -228,8 +241,8 @@ class EchoTest(unittest.TestCase):
 | 
	
		
			
				|  |  |                               server_trailing_binary_metadata_value)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      server_call.status(_low.Status(_low.Code.OK, details), status_tag)
 | 
	
		
			
				|  |  | -    server_terminal_event_one = self.server_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | -    server_terminal_event_two = self.server_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +    server_terminal_event_one = self.server_events.get()
 | 
	
		
			
				|  |  | +    server_terminal_event_two = self.server_events.get()
 | 
	
		
			
				|  |  |      if server_terminal_event_one.kind == _low.Event.Kind.COMPLETE_ACCEPTED:
 | 
	
		
			
				|  |  |        status_accepted = server_terminal_event_one
 | 
	
		
			
				|  |  |        rpc_accepted = server_terminal_event_two
 | 
	
	
		
			
				|  | @@ -246,8 +259,8 @@ class EchoTest(unittest.TestCase):
 | 
	
		
			
				|  |  |      self.assertEqual(_low.Status(_low.Code.OK, ''), rpc_accepted.status)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      client_call.read(read_tag)
 | 
	
		
			
				|  |  | -    client_terminal_event_one = self.client_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | -    client_terminal_event_two = self.client_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +    client_terminal_event_one = self.client_events.get()
 | 
	
		
			
				|  |  | +    client_terminal_event_two = self.client_events.get()
 | 
	
		
			
				|  |  |      if client_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
 | 
	
		
			
				|  |  |        read_accepted = client_terminal_event_one
 | 
	
		
			
				|  |  |        finish_accepted = client_terminal_event_two
 | 
	
	
		
			
				|  | @@ -303,22 +316,26 @@ class CancellationTest(unittest.TestCase):
 | 
	
		
			
				|  |  |      self.server = _low.Server(self.server_completion_queue)
 | 
	
		
			
				|  |  |      port = self.server.add_http2_addr('[::]:0')
 | 
	
		
			
				|  |  |      self.server.start()
 | 
	
		
			
				|  |  | +    self.server_events = Queue.Queue()
 | 
	
		
			
				|  |  | +    self.server_completion_queue_thread = threading.Thread(
 | 
	
		
			
				|  |  | +        target=_drive_completion_queue,
 | 
	
		
			
				|  |  | +        args=(self.server_completion_queue, self.server_events))
 | 
	
		
			
				|  |  | +    self.server_completion_queue_thread.start()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      self.client_completion_queue = _low.CompletionQueue()
 | 
	
		
			
				|  |  |      self.channel = _low.Channel('%s:%d' % (self.host, port), None)
 | 
	
		
			
				|  |  | +    self.client_events = Queue.Queue()
 | 
	
		
			
				|  |  | +    self.client_completion_queue_thread = threading.Thread(
 | 
	
		
			
				|  |  | +        target=_drive_completion_queue,
 | 
	
		
			
				|  |  | +        args=(self.client_completion_queue, self.client_events))
 | 
	
		
			
				|  |  | +    self.client_completion_queue_thread.start()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    def tearDown(self):
 | 
	
		
			
				|  |  |      self.server.stop()
 | 
	
		
			
				|  |  |      self.server_completion_queue.stop()
 | 
	
		
			
				|  |  |      self.client_completion_queue.stop()
 | 
	
		
			
				|  |  | -    while True:
 | 
	
		
			
				|  |  | -      event = self.server_completion_queue.get(0)
 | 
	
		
			
				|  |  | -      if event is not None and event.kind is _low.Event.Kind.STOP:
 | 
	
		
			
				|  |  | -        break
 | 
	
		
			
				|  |  | -    while True:
 | 
	
		
			
				|  |  | -      event = self.client_completion_queue.get(0)
 | 
	
		
			
				|  |  | -      if event is not None and event.kind is _low.Event.Kind.STOP:
 | 
	
		
			
				|  |  | -        break
 | 
	
		
			
				|  |  | +    self.server_completion_queue_thread.join()
 | 
	
		
			
				|  |  | +    self.client_completion_queue_thread.join()
 | 
	
		
			
				|  |  |      del self.server
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |    def testCancellation(self):
 | 
	
	
		
			
				|  | @@ -340,29 +357,29 @@ class CancellationTest(unittest.TestCase):
 | 
	
		
			
				|  |  |      client_call.invoke(self.client_completion_queue, metadata_tag, finish_tag)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      self.server.service(service_tag)
 | 
	
		
			
				|  |  | -    service_accepted = self.server_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +    service_accepted = self.server_events.get()
 | 
	
		
			
				|  |  |      server_call = service_accepted.service_acceptance.call
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      server_call.accept(self.server_completion_queue, finish_tag)
 | 
	
		
			
				|  |  |      server_call.premetadata()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    metadata_accepted = self.client_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +    metadata_accepted = self.client_events.get()
 | 
	
		
			
				|  |  |      self.assertIsNotNone(metadata_accepted)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      for datum in test_data:
 | 
	
		
			
				|  |  |        client_call.write(datum, write_tag)
 | 
	
		
			
				|  |  | -      write_accepted = self.client_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +      write_accepted = self.client_events.get()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |        server_call.read(read_tag)
 | 
	
		
			
				|  |  | -      read_accepted = self.server_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +      read_accepted = self.server_events.get()
 | 
	
		
			
				|  |  |        server_data.append(read_accepted.bytes)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |        server_call.write(read_accepted.bytes, write_tag)
 | 
	
		
			
				|  |  | -      write_accepted = self.server_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +      write_accepted = self.server_events.get()
 | 
	
		
			
				|  |  |        self.assertIsNotNone(write_accepted)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |        client_call.read(read_tag)
 | 
	
		
			
				|  |  | -      read_accepted = self.client_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +      read_accepted = self.client_events.get()
 | 
	
		
			
				|  |  |        client_data.append(read_accepted.bytes)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      client_call.cancel()
 | 
	
	
		
			
				|  | @@ -373,8 +390,8 @@ class CancellationTest(unittest.TestCase):
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |      server_call.read(read_tag)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    server_terminal_event_one = self.server_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | -    server_terminal_event_two = self.server_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +    server_terminal_event_one = self.server_events.get()
 | 
	
		
			
				|  |  | +    server_terminal_event_two = self.server_events.get()
 | 
	
		
			
				|  |  |      if server_terminal_event_one.kind == _low.Event.Kind.READ_ACCEPTED:
 | 
	
		
			
				|  |  |        read_accepted = server_terminal_event_one
 | 
	
		
			
				|  |  |        rpc_accepted = server_terminal_event_two
 | 
	
	
		
			
				|  | @@ -388,7 +405,7 @@ class CancellationTest(unittest.TestCase):
 | 
	
		
			
				|  |  |      self.assertEqual(_low.Event.Kind.FINISH, rpc_accepted.kind)
 | 
	
		
			
				|  |  |      self.assertEqual(_low.Status(_low.Code.CANCELLED, ''), rpc_accepted.status)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -    finish_event = self.client_completion_queue.get(_FUTURE)
 | 
	
		
			
				|  |  | +    finish_event = self.client_events.get()
 | 
	
		
			
				|  |  |      self.assertEqual(_low.Event.Kind.FINISH, finish_event.kind)
 | 
	
		
			
				|  |  |      self.assertEqual(_low.Status(_low.Code.CANCELLED, 'Cancelled'), 
 | 
	
		
			
				|  |  |                                   finish_event.status)
 |