|  | @@ -28,7 +28,7 @@
 | 
	
		
			
				|  |  |  @implementation GRXConcurrentWriteable {
 | 
	
		
			
				|  |  |    dispatch_queue_t _writeableQueue;
 | 
	
		
			
				|  |  |    // This ensures that writesFinishedWithError: is only sent once to the writeable.
 | 
	
		
			
				|  |  | -  dispatch_once_t _alreadyFinished;
 | 
	
		
			
				|  |  | +  BOOL _alreadyFinished;
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  - (instancetype)init {
 | 
	
	
		
			
				|  | @@ -65,19 +65,35 @@
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  - (void)enqueueSuccessfulCompletion {
 | 
	
		
			
				|  |  |    dispatch_async(_writeableQueue, ^{
 | 
	
		
			
				|  |  | -    dispatch_once(&_alreadyFinished, ^{
 | 
	
		
			
				|  |  | +    BOOL finished = NO;
 | 
	
		
			
				|  |  | +    @synchronized (self) {
 | 
	
		
			
				|  |  | +      if (!_alreadyFinished) {
 | 
	
		
			
				|  |  | +        _alreadyFinished = YES;
 | 
	
		
			
				|  |  | +      } else {
 | 
	
		
			
				|  |  | +        finished = YES;
 | 
	
		
			
				|  |  | +      }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +    if (!finished) {
 | 
	
		
			
				|  |  |        // Cancellation is now impossible. None of the other three blocks can run concurrently with
 | 
	
		
			
				|  |  |        // this one.
 | 
	
		
			
				|  |  |        [self.writeable writesFinishedWithError:nil];
 | 
	
		
			
				|  |  |        // Skip any possible message to the wrapped writeable enqueued after this one.
 | 
	
		
			
				|  |  |        self.writeable = nil;
 | 
	
		
			
				|  |  | -    });
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  |    });
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  - (void)cancelWithError:(NSError *)error {
 | 
	
		
			
				|  |  |    NSAssert(error, @"For a successful completion, use enqueueSuccessfulCompletion.");
 | 
	
		
			
				|  |  | -  dispatch_once(&_alreadyFinished, ^{
 | 
	
		
			
				|  |  | +  BOOL finished = NO;
 | 
	
		
			
				|  |  | +  @synchronized (self) {
 | 
	
		
			
				|  |  | +    if (!_alreadyFinished) {
 | 
	
		
			
				|  |  | +      _alreadyFinished = YES;
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      finished = YES;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (!finished) {
 | 
	
		
			
				|  |  |      // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
 | 
	
		
			
				|  |  |      // nillify writeable because we might be running concurrently with the blocks in
 | 
	
		
			
				|  |  |      // _writeableQueue, and assignment with ARC isn't atomic.
 | 
	
	
		
			
				|  | @@ -87,15 +103,23 @@
 | 
	
		
			
				|  |  |      dispatch_async(_writeableQueue, ^{
 | 
	
		
			
				|  |  |        [writeable writesFinishedWithError:error];
 | 
	
		
			
				|  |  |      });
 | 
	
		
			
				|  |  | -  });
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  - (void)cancelSilently {
 | 
	
		
			
				|  |  | -  dispatch_once(&_alreadyFinished, ^{
 | 
	
		
			
				|  |  | +  BOOL finished = NO;
 | 
	
		
			
				|  |  | +  @synchronized (self) {
 | 
	
		
			
				|  |  | +    if (!_alreadyFinished) {
 | 
	
		
			
				|  |  | +      _alreadyFinished = YES;
 | 
	
		
			
				|  |  | +    } else {
 | 
	
		
			
				|  |  | +      finished = YES;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  | +  if (!finished) {
 | 
	
		
			
				|  |  |      // Skip any of the still-enqueued messages to the wrapped writeable. We use the atomic setter to
 | 
	
		
			
				|  |  |      // nillify writeable because we might be running concurrently with the blocks in
 | 
	
		
			
				|  |  |      // _writeableQueue, and assignment with ARC isn't atomic.
 | 
	
		
			
				|  |  |      self.writeable = nil;
 | 
	
		
			
				|  |  | -  });
 | 
	
		
			
				|  |  | +  }
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  @end
 |