Bläddra i källkod

Refactor channel pool

Muxi Yan 7 år sedan
förälder
incheckning
459da578db

+ 7 - 8
src/objective-c/GRPCClient/GRPCCall.m

@@ -36,6 +36,8 @@
 #import "private/NSDictionary+GRPC.h"
 #import "private/NSError+GRPC.h"
 #import "private/utilities.h"
+#import "private/GRPCChannelPool.h"
+#import "private/GRPCCompletionQueue.h"
 
 // At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA,
 // SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE,
@@ -819,8 +821,11 @@ const char *kCFStreamVarName = "grpc_cfstream";
   _responseWriteable =
       [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
 
-  GRPCWrappedCall *wrappedCall =
-      [[GRPCWrappedCall alloc] initWithHost:_host path:_path callOptions:_callOptions];
+  GRPCPooledChannel *channel = [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
+  GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:_path
+                                              completionQueue:[GRPCCompletionQueue completionQueue]
+                                                  callOptions:_callOptions];
+
   if (wrappedCall == nil) {
     [self maybeFinishWithError:[NSError errorWithDomain:kGRPCErrorDomain
                                                    code:GRPCErrorCodeUnavailable
@@ -837,12 +842,6 @@ const char *kCFStreamVarName = "grpc_cfstream";
 
   [self sendHeaders];
   [self invokeCall];
-
-  // Connectivity monitor is not required for CFStream
-  char *enableCFStream = getenv(kCFStreamVarName);
-  if (enableCFStream == nil || enableCFStream[0] != '1') {
-    [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
-  }
 }
 
 - (void)startWithWriteable:(id<GRXWriteable>)writeable {

+ 21 - 14
src/objective-c/GRPCClient/private/GRPCChannelPool.h

@@ -32,10 +32,13 @@ NS_ASSUME_NONNULL_BEGIN
 @class GRPCChannelPool;
 @class GRPCCompletionQueue;
 @class GRPCChannelConfiguration;
+@class GRPCWrappedCall;
 
 /**
- * Channel proxy that can be retained and automatically reestablish connection when the channel is
- * disconnected.
+ * A proxied channel object that can be retained and creates GRPCWrappedCall object from. If a
+ * raw channel is not present (i.e. no tcp connection to the server) when a GRPCWrappedCall object
+ * is requested, it issues a connection/reconnection. The behavior of this object is to mimic that
+ * of gRPC core's channel object.
  */
 @interface GRPCPooledChannel : NSObject
 
@@ -47,24 +50,21 @@ NS_ASSUME_NONNULL_BEGIN
  * Initialize with an actual channel object \a channel and a reference to the channel pool.
  */
 - (nullable instancetype)initWithChannelConfiguration:
-                             (GRPCChannelConfiguration *)channelConfiguration
-                                          channelPool:(GRPCChannelPool *)channelPool
-    NS_DESIGNATED_INITIALIZER;
+                             (GRPCChannelConfiguration *)channelConfiguration;
 
 /**
- * Create a grpc core call object (grpc_call) from this channel. If channel is disconnected, get a
+ * Create a GRPCWrappedCall object (grpc_call) from this channel. If channel is disconnected, get a
  * new channel object from the channel pool.
  */
-- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path
-                              completionQueue:(GRPCCompletionQueue *)queue
-                                  callOptions:(GRPCCallOptions *)callOptions;
+- (nullable GRPCWrappedCall *)wrappedCallWithPath:(NSString *)path
+                                  completionQueue:(GRPCCompletionQueue *)queue
+                                      callOptions:(GRPCCallOptions *)callOptions;
 
 /**
- * Return ownership and destroy the grpc_call object created by
- * \a unmanagedCallWithPath:completionQueue:callOptions: and decrease channel refcount. If refcount
- * of the channel becomes 0, return the channel object to channel pool.
+ * Notify the pooled channel that a wrapped call object is no longer referenced and will be
+ * dealloc'ed.
  */
-- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall;
+- (void)notifyWrappedCallDealloc:(GRPCWrappedCall *)wrappedCall;
 
 /**
  * Force the channel to disconnect immediately. Subsequent calls to unmanagedCallWithPath: will
@@ -77,6 +77,13 @@ NS_ASSUME_NONNULL_BEGIN
 /** Test-only interface for \a GRPCPooledChannel. */
 @interface GRPCPooledChannel (Test)
 
+/**
+ * Initialize a pooled channel with non-default destroy delay for testing purpose.
+ */
+- (nullable instancetype)initWithChannelConfiguration:
+(GRPCChannelConfiguration *)channelConfiguration
+                                         destroyDelay:(NSTimeInterval)destroyDelay;
+
 /**
  * Return the pointer to the raw channel wrapped.
  */
@@ -118,7 +125,7 @@ NS_ASSUME_NONNULL_BEGIN
  * Get an instance of pool isolated from the global shared pool with channels' destroy delay being
  * \a destroyDelay.
  */
-- (nullable instancetype)initTestPoolWithDestroyDelay:(NSTimeInterval)destroyDelay;
+- (nullable instancetype)initTestPool;
 
 @end
 

+ 107 - 169
src/objective-c/GRPCClient/private/GRPCChannelPool.m

@@ -28,6 +28,8 @@
 #import "GRPCSecureChannelFactory.h"
 #import "utilities.h"
 #import "version.h"
+#import "GRPCWrappedCall.h"
+#import "GRPCCompletionQueue.h"
 
 #import <GRPCClient/GRPCCall+Cronet.h>
 #include <grpc/support/log.h>
@@ -40,103 +42,139 @@ static dispatch_once_t gInitChannelPool;
 /** When all calls of a channel are destroyed, destroy the channel after this much seconds. */
 static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
 
-@interface GRPCChannelPool ()
-
-- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration;
-
-- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration;
-
-@end
-
 @implementation GRPCPooledChannel {
-  __weak GRPCChannelPool *_channelPool;
   GRPCChannelConfiguration *_channelConfiguration;
-  NSMutableSet *_unmanagedCalls;
+  NSTimeInterval _destroyDelay;
+
+  NSHashTable<GRPCWrappedCall *> *_wrappedCalls;
   GRPCChannel *_wrappedChannel;
+  NSDate *_lastTimedDestroy;
+  dispatch_queue_t _timerQueue;
 }
 
-- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration
-                                 channelPool:(GRPCChannelPool *)channelPool {
-  NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty.");
-  NSAssert(channelPool != nil, @"channelPool cannot be empty.");
-  if (channelPool == nil || channelConfiguration == nil) {
-    return nil;
-  }
-
-  if ((self = [super init])) {
-    _channelPool = channelPool;
-    _channelConfiguration = channelConfiguration;
-    _unmanagedCalls = [NSMutableSet set];
-    _wrappedChannel = nil;
-  }
-
-  return self;
+- (instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration {
+  return [self initWithChannelConfiguration:channelConfiguration destroyDelay:kDefaultChannelDestroyDelay];
 }
 
 - (void)dealloc {
-  NSAssert([_unmanagedCalls count] == 0 && _wrappedChannel == nil,
-           @"Pooled channel should only be"
-            "destroyed after the wrapped channel is destroyed");
+  if ([_wrappedCalls objectEnumerator].allObjects.count != 0) {
+    NSEnumerator *enumerator = [_wrappedCalls objectEnumerator];
+    GRPCWrappedCall *wrappedCall;
+    while ((wrappedCall = [enumerator nextObject])) {
+      [wrappedCall channelDisconnected];
+    };
+  }
 }
 
-- (grpc_call *)unmanagedCallWithPath:(NSString *)path
-                     completionQueue:(GRPCCompletionQueue *)queue
-                         callOptions:(GRPCCallOptions *)callOptions {
+- (GRPCWrappedCall *)wrappedCallWithPath:(NSString *)path
+completionQueue:(GRPCCompletionQueue *)queue
+callOptions:(GRPCCallOptions *)callOptions {
   NSAssert(path.length > 0, @"path must not be empty.");
   NSAssert(queue != nil, @"completionQueue must not be empty.");
   NSAssert(callOptions, @"callOptions must not be empty.");
   if (path.length == 0 || queue == nil || callOptions == nil) return NULL;
 
-  grpc_call *call = NULL;
+  GRPCWrappedCall *call = nil;
+
   @synchronized(self) {
     if (_wrappedChannel == nil) {
-      __strong GRPCChannelPool *strongPool = _channelPool;
-      if (strongPool) {
-        _wrappedChannel = [strongPool refChannelWithConfiguration:_channelConfiguration];
+      _wrappedChannel = [[GRPCChannel alloc] initWithChannelConfiguration:_channelConfiguration];
+      if (_wrappedChannel == nil) {
+        NSAssert(_wrappedChannel != nil, @"Unable to get a raw channel for proxy.");
+        return nil;
       }
-      NSAssert(_wrappedChannel != nil, @"Unable to get a raw channel for proxy.");
     }
-    call =
-        [_wrappedChannel unmanagedCallWithPath:path completionQueue:queue callOptions:callOptions];
-    if (call != NULL) {
-      [_unmanagedCalls addObject:[NSValue valueWithPointer:call]];
+    _lastTimedDestroy = nil;
+
+    grpc_call *unmanagedCall = [_wrappedChannel unmanagedCallWithPath:path
+                                                      completionQueue:[GRPCCompletionQueue completionQueue]
+                                                          callOptions:callOptions];
+    if (unmanagedCall == NULL) {
+      NSAssert(unmanagedCall != NULL, @"Unable to create grpc_call object");
+      return nil;
+    }
+
+    call = [[GRPCWrappedCall alloc] initWithUnmanagedCall:unmanagedCall pooledChannel:self];
+    if (call == nil) {
+      NSAssert(call != nil, @"Unable to create GRPCWrappedCall object");
+      return nil;
     }
+
+    [_wrappedCalls addObject:call];
   }
   return call;
 }
 
-- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall {
-  if (unmanagedCall == NULL) {
+- (void)notifyWrappedCallDealloc:(GRPCWrappedCall *)wrappedCall {
+  NSAssert(wrappedCall != nil, @"wrappedCall cannot be empty.");
+  if (wrappedCall == nil) {
     return;
   }
-
-  grpc_call_unref(unmanagedCall);
   @synchronized(self) {
-    NSValue *removedCall = [NSValue valueWithPointer:unmanagedCall];
-    [_unmanagedCalls removeObject:removedCall];
-    if ([_unmanagedCalls count] == 0) {
-      _wrappedChannel = nil;
-      GRPCChannelPool *strongPool = _channelPool;
-      [strongPool unrefChannelWithConfiguration:_channelConfiguration];
+    if ([_wrappedCalls objectEnumerator].allObjects.count == 0) {
+      NSDate *now = [NSDate date];
+      _lastTimedDestroy = now;
+      dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)_destroyDelay * NSEC_PER_SEC),
+                     _timerQueue, ^{
+                       @synchronized(self) {
+                         if (self->_lastTimedDestroy == now) {
+                           self->_wrappedChannel = nil;
+                           self->_lastTimedDestroy = nil;
+                         }
+                       }
+                     });
     }
   }
 }
 
 - (void)disconnect {
+  NSHashTable<GRPCWrappedCall *> *copiedWrappedCalls = nil;
   @synchronized(self) {
     if (_wrappedChannel != nil) {
       _wrappedChannel = nil;
-      [_unmanagedCalls removeAllObjects];
-      GRPCChannelPool *strongPool = _channelPool;
-      [strongPool unrefChannelWithConfiguration:_channelConfiguration];
+      copiedWrappedCalls = [_wrappedCalls copy];
+      [_wrappedCalls removeAllObjects];
     }
   }
+  NSEnumerator *enumerator = [copiedWrappedCalls objectEnumerator];
+  GRPCWrappedCall *wrappedCall;
+  while ((wrappedCall = [enumerator nextObject])) {
+    [wrappedCall channelDisconnected];
+  }
 }
 
 @end
 
 @implementation GRPCPooledChannel (Test)
 
+- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration *)channelConfiguration
+                                         destroyDelay:(NSTimeInterval)destroyDelay {
+  NSAssert(channelConfiguration != nil, @"channelConfiguration cannot be empty.");
+  if (channelConfiguration == nil) {
+    return nil;
+  }
+
+  if ((self = [super init])) {
+    _channelConfiguration = channelConfiguration;
+    _destroyDelay = destroyDelay;
+    _wrappedCalls = [[NSHashTable alloc] initWithOptions:NSHashTableWeakMemory capacity:1];
+    _wrappedChannel = nil;
+    _lastTimedDestroy = nil;
+#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
+    if (@available(iOS 8.0, macOS 10.10, *)) {
+      _timerQueue = dispatch_queue_create(NULL,
+                                          dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
+    } else {
+#else
+    {
+#endif
+      _timerQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
+    }
+  }
+
+  return self;
+}
+
 - (GRPCChannel *)wrappedChannel {
   GRPCChannel *channel = nil;
   @synchronized(self) {
@@ -147,65 +185,28 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
 
 @end
 
-/**
- * A convenience value type for cached channel.
- */
-@interface GRPCChannelRecord : NSObject
-
-/** Pointer to the raw channel. May be nil when the channel has been destroyed. */
-@property GRPCChannel *channel;
-
-/** Channel proxy corresponding to this channel configuration. */
-@property GRPCPooledChannel *pooledChannel;
-
-/** Last time when a timed destroy is initiated on the channel. */
-@property NSDate *timedDestroyDate;
-
-/** Reference count of the proxy to the channel. */
-@property NSUInteger refCount;
-
-@end
-
-@implementation GRPCChannelRecord
-
-@end
-
 @interface GRPCChannelPool ()
 
-- (instancetype)initInstanceWithDestroyDelay:(NSTimeInterval)destroyDelay NS_DESIGNATED_INITIALIZER;
+- (instancetype)initInstance NS_DESIGNATED_INITIALIZER;
 
 @end
 
 @implementation GRPCChannelPool {
-  NSMutableDictionary<GRPCChannelConfiguration *, GRPCChannelRecord *> *_channelPool;
-  dispatch_queue_t _dispatchQueue;
-  NSTimeInterval _destroyDelay;
+  NSMutableDictionary<GRPCChannelConfiguration *, GRPCPooledChannel *> *_channelPool;
 }
 
 + (instancetype)sharedInstance {
   dispatch_once(&gInitChannelPool, ^{
     gChannelPool =
-        [[GRPCChannelPool alloc] initInstanceWithDestroyDelay:kDefaultChannelDestroyDelay];
+        [[GRPCChannelPool alloc] initInstance];
     NSAssert(gChannelPool != nil, @"Cannot initialize global channel pool.");
   });
   return gChannelPool;
 }
 
-- (instancetype)initInstanceWithDestroyDelay:(NSTimeInterval)destroyDelay {
+- (instancetype)initInstance {
   if ((self = [super init])) {
     _channelPool = [NSMutableDictionary dictionary];
-#if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
-    if (@available(iOS 8.0, macOS 10.10, *)) {
-      _dispatchQueue = dispatch_queue_create(
-          NULL,
-          dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
-    } else {
-#else
-    {
-#endif
-      _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
-    }
-    _destroyDelay = destroyDelay;
 
     // Connectivity monitor is not required for CFStream
     char *enableCFStream = getenv(kCFStreamVarName);
@@ -231,86 +232,23 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
   GRPCChannelConfiguration *configuration =
       [[GRPCChannelConfiguration alloc] initWithHost:host callOptions:callOptions];
   @synchronized(self) {
-    GRPCChannelRecord *record = _channelPool[configuration];
-    if (record == nil) {
-      record = [[GRPCChannelRecord alloc] init];
-      record.pooledChannel =
-          [[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration channelPool:self];
-      _channelPool[configuration] = record;
-      pooledChannel = record.pooledChannel;
-    } else {
-      pooledChannel = record.pooledChannel;
+    pooledChannel = _channelPool[configuration];
+    if (pooledChannel == nil) {
+      pooledChannel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:configuration];
+      _channelPool[configuration] = pooledChannel;
     }
   }
   return pooledChannel;
 }
 
-- (GRPCChannel *)refChannelWithConfiguration:(GRPCChannelConfiguration *)configuration {
-  GRPCChannel *ret = nil;
-  @synchronized(self) {
-    NSAssert(configuration != nil, @"configuration cannot be empty.");
-    if (configuration == nil) {
-      return nil;
-    }
-
-    GRPCChannelRecord *record = _channelPool[configuration];
-    NSAssert(record != nil, @"No record corresponding to a proxy.");
-    if (record == nil) {
-      return nil;
-    }
-
-    record.refCount++;
-    record.timedDestroyDate = nil;
-    if (record.channel == nil) {
-      // Channel is already destroyed;
-      record.channel = [[GRPCChannel alloc] initWithChannelConfiguration:configuration];
-    }
-    ret = record.channel;
-  }
-  return ret;
-}
-
-- (void)unrefChannelWithConfiguration:(GRPCChannelConfiguration *)configuration {
-  @synchronized(self) {
-    GRPCChannelRecord *record = _channelPool[configuration];
-    NSAssert(record != nil, @"No record corresponding to a proxy.");
-    if (record == nil) {
-      return;
-    }
-    NSAssert(record.refCount > 0, @"Inconsistent channel refcount.");
-    if (record.refCount > 0) {
-      record.refCount--;
-      if (record.refCount == 0) {
-        NSDate *now = [NSDate date];
-        record.timedDestroyDate = now;
-        dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(_destroyDelay * NSEC_PER_SEC)),
-                       _dispatchQueue, ^{
-                         @synchronized(self) {
-                           if (now == record.timedDestroyDate) {
-                             // Destroy the raw channel and reset related records.
-                             record.timedDestroyDate = nil;
-                             record.channel = nil;
-                           }
-                         }
-                       });
-      }
-    }
-  }
-}
-
 - (void)disconnectAllChannels {
-  NSMutableSet<GRPCPooledChannel *> *proxySet = [NSMutableSet set];
+  NSDictionary *copiedPooledChannels;
   @synchronized(self) {
-    [_channelPool
-        enumerateKeysAndObjectsUsingBlock:^(GRPCChannelConfiguration *_Nonnull key,
-                                            GRPCChannelRecord *_Nonnull obj, BOOL *_Nonnull stop) {
-          obj.channel = nil;
-          obj.timedDestroyDate = nil;
-          [proxySet addObject:obj.pooledChannel];
-        }];
+    copiedPooledChannels = [NSDictionary dictionaryWithDictionary:_channelPool];
   }
-  // Disconnect proxies
-  [proxySet enumerateObjectsUsingBlock:^(GRPCPooledChannel *_Nonnull obj, BOOL *_Nonnull stop) {
+
+  // Disconnect pooled channels.
+  [copiedPooledChannels enumerateKeysAndObjectsUsingBlock:^(id  _Nonnull key, id  _Nonnull obj, BOOL * _Nonnull stop) {
     [obj disconnect];
   }];
 }
@@ -323,8 +261,8 @@ static const NSTimeInterval kDefaultChannelDestroyDelay = 30;
 
 @implementation GRPCChannelPool (Test)
 
-- (instancetype)initTestPoolWithDestroyDelay:(NSTimeInterval)destroyDelay {
-  return [self initInstanceWithDestroyDelay:destroyDelay];
+- (instancetype)initTestPool {
+  return [self initInstance];
 }
 
 @end

+ 10 - 3
src/objective-c/GRPCClient/private/GRPCWrappedCall.h

@@ -71,11 +71,16 @@
 
 #pragma mark GRPCWrappedCall
 
+@class GRPCPooledChannel;
+
 @interface GRPCWrappedCall : NSObject
 
-- (instancetype)initWithHost:(NSString *)host
-                        path:(NSString *)path
-                 callOptions:(GRPCCallOptions *)callOptions NS_DESIGNATED_INITIALIZER;
+- (instancetype)init NS_UNAVAILABLE;
+
++ (instancetype)new NS_UNAVAILABLE;
+
+- (instancetype)initWithUnmanagedCall:(grpc_call *)unmanagedCall
+                        pooledChannel:(GRPCPooledChannel *)pooledChannel NS_DESIGNATED_INITIALIZER;
 
 - (void)startBatchWithOperations:(NSArray *)ops errorHandler:(void (^)(void))errorHandler;
 
@@ -83,4 +88,6 @@
 
 - (void)cancel;
 
+- (void)channelDisconnected;
+
 @end

+ 63 - 54
src/objective-c/GRPCClient/private/GRPCWrappedCall.m

@@ -237,37 +237,21 @@
 #pragma mark GRPCWrappedCall
 
 @implementation GRPCWrappedCall {
-  GRPCCompletionQueue *_queue;
-  GRPCPooledChannel *_channel;
+  __weak GRPCPooledChannel *_channel;
   grpc_call *_call;
 }
 
-- (instancetype)init {
-  return [self initWithHost:nil path:nil callOptions:[[GRPCCallOptions alloc] init]];
-}
-
-- (instancetype)initWithHost:(NSString *)host
-                        path:(NSString *)path
-                 callOptions:(GRPCCallOptions *)callOptions {
-  NSAssert(host.length != 0 && path.length != 0, @"path and host cannot be nil.");
+- (instancetype)initWithUnmanagedCall:(grpc_call *)unmanagedCall
+                        pooledChannel:(GRPCPooledChannel *)pooledChannel {
+  NSAssert(unmanagedCall != NULL, @"unmanagedCall cannot be empty.");
+  NSAssert(pooledChannel != nil, @"pooledChannel cannot be empty.");
+  if (unmanagedCall == NULL || pooledChannel == nil) {
+    return nil;
+  }
 
   if ((self = [super init])) {
-    // Each completion queue consumes one thread. There's a trade to be made between creating and
-    // consuming too many threads and having contention of multiple calls in a single completion
-    // queue. Currently we use a singleton queue.
-    _queue = [GRPCCompletionQueue completionQueue];
-    _channel = [[GRPCChannelPool sharedInstance] channelWithHost:host callOptions:callOptions];
-    if (_channel == nil) {
-      NSAssert(_channel != nil, @"Failed to get a channel for the host.");
-      NSLog(@"Failed to get a channel for the host.");
-      return nil;
-    }
-    _call = [_channel unmanagedCallWithPath:path completionQueue:_queue callOptions:callOptions];
-    if (_call == nil) {
-      NSAssert(_channel != nil, @"Failed to get a channel for the host.");
-      NSLog(@"Failed to create a call.");
-      return nil;
-    }
+    _call = unmanagedCall;
+    _channel = pooledChannel;
   }
   return self;
 }
@@ -283,42 +267,67 @@
   [GRPCOpBatchLog addOpBatchToLog:operations];
 #endif
 
-  size_t nops = operations.count;
-  grpc_op *ops_array = gpr_malloc(nops * sizeof(grpc_op));
-  size_t i = 0;
-  for (GRPCOperation *operation in operations) {
-    ops_array[i++] = operation.op;
-  }
-  grpc_call_error error =
-      grpc_call_start_batch(_call, ops_array, nops, (__bridge_retained void *)(^(bool success) {
-                              if (!success) {
-                                if (errorHandler) {
-                                  errorHandler();
-                                } else {
-                                  return;
-                                }
-                              }
-                              for (GRPCOperation *operation in operations) {
-                                [operation finish];
-                              }
-                            }),
-                            NULL);
-  gpr_free(ops_array);
-
-  if (error != GRPC_CALL_OK) {
-    [NSException
+  @synchronized (self) {
+    if (_call != NULL) {
+      size_t nops = operations.count;
+      grpc_op *ops_array = gpr_malloc(nops * sizeof(grpc_op));
+      size_t i = 0;
+      for (GRPCOperation *operation in operations) {
+        ops_array[i++] = operation.op;
+      }
+      grpc_call_error error;
+      error = grpc_call_start_batch(_call, ops_array, nops, (__bridge_retained void *)(^(bool success) {
+        if (!success) {
+          if (errorHandler) {
+            errorHandler();
+          } else {
+            return;
+          }
+        }
+        for (GRPCOperation *operation in operations) {
+          [operation finish];
+        }
+      }),
+                                    NULL);
+      gpr_free(ops_array);
+
+      if (error != GRPC_CALL_OK) {
+        [NSException
          raise:NSInternalInconsistencyException
-        format:@"A precondition for calling grpc_call_start_batch wasn't met. Error %i", error];
+         format:@"A precondition for calling grpc_call_start_batch wasn't met. Error %i", error];
+      }
+    }
   }
 }
 
 - (void)cancel {
-  grpc_call_cancel(_call, NULL);
+  @synchronized (self) {
+    if (_call != NULL) {
+      grpc_call_cancel(_call, NULL);
+    }
+  }
+}
+
+- (void)channelDisconnected {
+  @synchronized (self) {
+    if (_call != NULL) {
+      grpc_call_unref(_call);
+      _call = NULL;
+    }
+  }
 }
 
 - (void)dealloc {
-  [_channel destroyUnmanagedCall:_call];
-  _channel = nil;
+  @synchronized (self) {
+    if (_call != NULL) {
+      grpc_call_unref(_call);
+      _call = NULL;
+    }
+  }
+  __strong GRPCPooledChannel *channel = _channel;
+  if (channel != nil) {
+    [channel notifyWrappedCallDealloc:self];
+  }
 }
 
 @end

+ 18 - 128
src/objective-c/tests/ChannelTests/ChannelPoolTest.m

@@ -24,11 +24,9 @@
 
 #define TEST_TIMEOUT 32
 
-NSString *kDummyHost = @"dummy.host";
-NSString *kDummyHost2 = @"dummy.host.2";
-NSString *kDummyPath = @"/dummy/path";
-
-const NSTimeInterval kDestroyDelay = 1.0;
+static NSString *kDummyHost = @"dummy.host";
+static NSString *kDummyHost2 = @"dummy.host.2";
+static NSString *kDummyPath = @"/dummy/path";
 
 @interface ChannelPoolTest : XCTestCase
 
@@ -40,134 +38,26 @@ const NSTimeInterval kDestroyDelay = 1.0;
   grpc_init();
 }
 
-- (void)testCreateChannelAndCall {
-  GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
-  GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
-  GRPCPooledChannel *channel =
-      (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
-  XCTAssertNil(channel.wrappedChannel);
-  GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
-  grpc_call *call =
-      [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
-  XCTAssert(call != NULL);
-  XCTAssertNotNil(channel.wrappedChannel);
-  [channel destroyUnmanagedCall:call];
-  XCTAssertNil(channel.wrappedChannel);
-}
-
-- (void)testCacheChannel {
-  GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
+- (void)testCreateAndCacheChannel {
+  GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPool];
   GRPCCallOptions *options1 = [[GRPCCallOptions alloc] init];
   GRPCCallOptions *options2 = [options1 copy];
   GRPCMutableCallOptions *options3 = [options1 mutableCopy];
   options3.transportType = GRPCTransportTypeInsecure;
-  GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
-  GRPCPooledChannel *channel1 =
-      (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options1];
-  grpc_call *call1 =
-      [channel1 unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options1];
-  GRPCPooledChannel *channel2 =
-      (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options2];
-  grpc_call *call2 =
-      [channel2 unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options2];
-  GRPCPooledChannel *channel3 =
-      (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options3];
-  grpc_call *call3 =
-      [channel3 unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options3];
-  GRPCPooledChannel *channel4 =
-      (GRPCPooledChannel *)[pool channelWithHost:kDummyHost2 callOptions:options1];
-  grpc_call *call4 =
-      [channel4 unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options1];
-  XCTAssertEqual(channel1.wrappedChannel, channel2.wrappedChannel);
-  XCTAssertNotEqual(channel1.wrappedChannel, channel3.wrappedChannel);
-  XCTAssertNotEqual(channel1.wrappedChannel, channel4.wrappedChannel);
-  XCTAssertNotEqual(channel3.wrappedChannel, channel4.wrappedChannel);
-  [channel1 destroyUnmanagedCall:call1];
-  [channel2 destroyUnmanagedCall:call2];
-  [channel3 destroyUnmanagedCall:call3];
-  [channel4 destroyUnmanagedCall:call4];
-}
-
-- (void)testTimedDestroyChannel {
-  GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
-  GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
-  GRPCPooledChannel *channel =
-      (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
-  GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
-  grpc_call *call =
-      [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
-  GRPCChannel *wrappedChannel = channel.wrappedChannel;
-
-  [channel destroyUnmanagedCall:call];
-  // Confirm channel is not destroyed at this time
-  call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
-  XCTAssertEqual(wrappedChannel, channel.wrappedChannel);
-
-  [channel destroyUnmanagedCall:call];
-  sleep(kDestroyDelay + 1);
-  // Confirm channel is new at this time
-  call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
-  XCTAssertNotEqual(wrappedChannel, channel.wrappedChannel);
-
-  // Confirm the new channel can create call
-  XCTAssert(call != NULL);
-  [channel destroyUnmanagedCall:call];
-}
-
-- (void)testPoolDisconnection {
-  GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
-  GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
-  GRPCPooledChannel *channel =
-      (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
-  GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
-  grpc_call *call =
-      [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
-  XCTAssertNotNil(channel.wrappedChannel);
-  GRPCChannel *wrappedChannel = channel.wrappedChannel;
-
-  // Test a new channel is created by requesting a channel from pool
-  [pool disconnectAllChannels];
-  channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
-  call = [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
-  XCTAssertNotNil(channel.wrappedChannel);
-  XCTAssertNotEqual(wrappedChannel, channel.wrappedChannel);
-  wrappedChannel = channel.wrappedChannel;
-
-  // Test a new channel is created by requesting a new call from the previous proxy
-  [pool disconnectAllChannels];
-  grpc_call *call2 =
-      [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
-  XCTAssertNotNil(channel.wrappedChannel);
-  XCTAssertNotEqual(channel.wrappedChannel, wrappedChannel);
-  [channel destroyUnmanagedCall:call];
-  [channel destroyUnmanagedCall:call2];
-}
-
-- (void)testUnrefCallFromStaleChannel {
-  GRPCChannelPool *pool = [[GRPCChannelPool alloc] initTestPoolWithDestroyDelay:kDestroyDelay];
-  GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
-  GRPCPooledChannel *channel =
-      (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
-  GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
-  grpc_call *call =
-      [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
-
-  [pool disconnectAllChannels];
-  channel = (GRPCPooledChannel *)[pool channelWithHost:kDummyHost callOptions:options];
 
-  grpc_call *call2 =
-      [channel unmanagedCallWithPath:kDummyPath completionQueue:cq callOptions:options];
-  // Test unref the call of a stale channel will not cause the current channel going into timed
-  // destroy state
-  XCTAssertNotNil(channel.wrappedChannel);
-  GRPCChannel *wrappedChannel = channel.wrappedChannel;
-  [channel destroyUnmanagedCall:call];
-  XCTAssertNotNil(channel.wrappedChannel);
-  XCTAssertEqual(wrappedChannel, channel.wrappedChannel);
-  // Test unref the call of the current channel will cause the channel going into timed destroy
-  // state
-  [channel destroyUnmanagedCall:call2];
-  XCTAssertNil(channel.wrappedChannel);
+  GRPCPooledChannel *channel1 = [pool channelWithHost:kDummyHost callOptions:options1];
+  GRPCPooledChannel *channel2 = [pool channelWithHost:kDummyHost callOptions:options2];
+  GRPCPooledChannel *channel3 = [pool channelWithHost:kDummyHost callOptions:options3];
+  GRPCPooledChannel *channel4 = [pool channelWithHost:kDummyHost2 callOptions:options1];
+
+  XCTAssertNotNil(channel1);
+  XCTAssertNotNil(channel2);
+  XCTAssertNotNil(channel3);
+  XCTAssertNotNil(channel4);
+  XCTAssertEqual(channel1, channel2);
+  XCTAssertNotEqual(channel1, channel3);
+  XCTAssertNotEqual(channel1, channel4);
+  XCTAssertNotEqual(channel3, channel4);
 }
 
 @end

+ 80 - 72
src/objective-c/tests/ChannelTests/ChannelTests.m

@@ -22,91 +22,99 @@
 #import "../../GRPCClient/private/GRPCChannel.h"
 #import "../../GRPCClient/private/GRPCChannelPool.h"
 #import "../../GRPCClient/private/GRPCCompletionQueue.h"
+#import "../../GRPCClient/private/GRPCWrappedCall.h"
 
-/*
-#define TEST_TIMEOUT 8
-
-@interface GRPCChannelFake : NSObject
-
-- (instancetype)initWithCreateExpectation:(XCTestExpectation *)createExpectation
-                         unrefExpectation:(XCTestExpectation *)unrefExpectation;
+static NSString *kDummyHost = @"dummy.host";
+static NSString *kDummyPath = @"/dummy/path";
 
-- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path
-                              completionQueue:(GRPCCompletionQueue *)queue
-                                  callOptions:(GRPCCallOptions *)callOptions;
-
-- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall;
+@interface ChannelTests : XCTestCase
 
 @end
 
-@implementation GRPCChannelFake {
-  __weak XCTestExpectation *_createExpectation;
-  __weak XCTestExpectation *_unrefExpectation;
-  long _grpcCallCounter;
-}
-
-- (nullable instancetype)initWithChannelConfiguration:(GRPCChannelConfiguration
-*)channelConfiguration { return nil;
-}
-
-- (instancetype)initWithCreateExpectation:(XCTestExpectation *)createExpectation
-                         unrefExpectation:(XCTestExpectation *)unrefExpectation {
-  if ((self = [super init])) {
-    _createExpectation = createExpectation;
-    _unrefExpectation = unrefExpectation;
-    _grpcCallCounter = 0;
-  }
-  return self;
-}
+@implementation ChannelTests
 
-- (nullable grpc_call *)unmanagedCallWithPath:(NSString *)path
-                              completionQueue:(GRPCCompletionQueue *)queue
-                                  callOptions:(GRPCCallOptions *)callOptions {
-  if (_createExpectation) [_createExpectation fulfill];
-  return (grpc_call *)(++_grpcCallCounter);
++ (void)setUp {
+  grpc_init();
 }
 
-- (void)destroyUnmanagedCall:(grpc_call *)unmanagedCall {
-  if (_unrefExpectation) [_unrefExpectation fulfill];
+- (void)testPooledChannelCreatingChannel {
+  GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
+  GRPCChannelConfiguration *config = [[GRPCChannelConfiguration alloc] initWithHost:kDummyHost
+                                                                        callOptions:options];
+  GRPCPooledChannel *channel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:config];
+  GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
+  GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:kDummyPath
+                                              completionQueue:cq
+                                                  callOptions:options];
+  XCTAssertNotNil(channel.wrappedChannel);
+  (void)wrappedCall;
 }
 
-@end
-
-@interface GRPCChannelPoolFake : NSObject
-
-- (instancetype)initWithDelayedDestroyExpectation:(XCTestExpectation *)delayedDestroyExpectation;
-
-- (GRPCChannel *)rawChannelWithHost:(NSString *)host callOptions:(GRPCCallOptions *)callOptions;
-
-- (void)delayedDestroyChannel;
-
-@end
-
-@implementation GRPCChannelPoolFake {
-  __weak XCTestExpectation *_delayedDestroyExpectation;
-}
-
-- (instancetype)initWithDelayedDestroyExpectation:(XCTestExpectation *)delayedDestroyExpectation {
-  if ((self = [super init])) {
-    _delayedDestroyExpectation = delayedDestroyExpectation;
+- (void)testTimedDestroyChannel {
+  const NSTimeInterval kDestroyDelay = 1.0;
+  GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
+  GRPCChannelConfiguration *config = [[GRPCChannelConfiguration alloc] initWithHost:kDummyHost
+                                                                        callOptions:options];
+  GRPCPooledChannel *channel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:config
+                                                                          destroyDelay:kDestroyDelay];
+  GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
+  GRPCWrappedCall *wrappedCall;
+  GRPCChannel *wrappedChannel;
+  @autoreleasepool {
+    wrappedCall = [channel wrappedCallWithPath:kDummyPath
+                                                completionQueue:cq
+                                                    callOptions:options];
+    XCTAssertNotNil(channel.wrappedChannel);
+
+    // Unref and ref channel immediately; expect using the same raw channel.
+    wrappedChannel = channel.wrappedChannel;
+
+    wrappedCall = nil;
+    wrappedCall = [channel wrappedCallWithPath:kDummyPath
+                               completionQueue:cq
+                                   callOptions:options];
+    XCTAssertEqual(channel.wrappedChannel, wrappedChannel);
+
+    // Unref and ref channel after destroy delay; expect a new raw channel.
+    wrappedCall = nil;
   }
-  return self;
-}
-
-- (void)delayedDestroyChannel {
-  if (_delayedDestroyExpectation) [_delayedDestroyExpectation fulfill];
+  sleep(kDestroyDelay + 1);
+  XCTAssertNil(channel.wrappedChannel);
+  wrappedCall = [channel wrappedCallWithPath:kDummyPath
+                             completionQueue:cq
+                                 callOptions:options];
+  XCTAssertNotEqual(channel.wrappedChannel, wrappedChannel);
 }
 
-@end */
-
-@interface ChannelTests : XCTestCase
-
-@end
-
-@implementation ChannelTests
-
-+ (void)setUp {
-  grpc_init();
+- (void)testDisconnect {
+  const NSTimeInterval kDestroyDelay = 1.0;
+  GRPCCallOptions *options = [[GRPCCallOptions alloc] init];
+  GRPCChannelConfiguration *config = [[GRPCChannelConfiguration alloc] initWithHost:kDummyHost
+                                                                        callOptions:options];
+  GRPCPooledChannel *channel = [[GRPCPooledChannel alloc] initWithChannelConfiguration:config
+                                                                          destroyDelay:kDestroyDelay];
+  GRPCCompletionQueue *cq = [GRPCCompletionQueue completionQueue];
+  GRPCWrappedCall *wrappedCall = [channel wrappedCallWithPath:kDummyPath
+                                              completionQueue:cq
+                                                  callOptions:options];
+  XCTAssertNotNil(channel.wrappedChannel);
+
+  // Disconnect; expect wrapped channel to be dropped
+  [channel disconnect];
+  XCTAssertNil(channel.wrappedChannel);
+
+  // Create a new call and unref the old call; confirm that destroy of the old call does not make
+  // the channel disconnect, even after the destroy delay.
+  GRPCWrappedCall *wrappedCall2 = [channel wrappedCallWithPath:kDummyPath
+                                               completionQueue:cq
+                                                   callOptions:options];
+  XCTAssertNotNil(channel.wrappedChannel);
+  GRPCChannel *wrappedChannel = channel.wrappedChannel;
+  wrappedCall = nil;
+  sleep(kDestroyDelay + 1);
+  XCTAssertNotNil(channel.wrappedChannel);
+  XCTAssertEqual(wrappedChannel, channel.wrappedChannel);
+  (void)wrappedCall2;
 }
 
 @end

+ 0 - 5
src/objective-c/tests/Tests.xcodeproj/xcshareddata/xcschemes/ChannelTests.xcscheme

@@ -37,11 +37,6 @@
                BlueprintName = "ChannelTests"
                ReferencedContainer = "container:Tests.xcodeproj">
             </BuildableReference>
-            <SkippedTests>
-               <Test
-                  Identifier = "ChannelTests">
-               </Test>
-            </SkippedTests>
          </TestableReference>
       </Testables>
       <AdditionalOptions>