|  | @@ -0,0 +1,410 @@
 | 
	
		
			
				|  |  | +# Copyright 2015, Google Inc.
 | 
	
		
			
				|  |  | +# All rights reserved.
 | 
	
		
			
				|  |  | +#
 | 
	
		
			
				|  |  | +# Redistribution and use in source and binary forms, with or without
 | 
	
		
			
				|  |  | +# modification, are permitted provided that the following conditions are
 | 
	
		
			
				|  |  | +# met:
 | 
	
		
			
				|  |  | +#
 | 
	
		
			
				|  |  | +#     * Redistributions of source code must retain the above copyright
 | 
	
		
			
				|  |  | +# notice, this list of conditions and the following disclaimer.
 | 
	
		
			
				|  |  | +#     * Redistributions in binary form must reproduce the above
 | 
	
		
			
				|  |  | +# copyright notice, this list of conditions and the following disclaimer
 | 
	
		
			
				|  |  | +# in the documentation and/or other materials provided with the
 | 
	
		
			
				|  |  | +# distribution.
 | 
	
		
			
				|  |  | +#     * Neither the name of Google Inc. nor the names of its
 | 
	
		
			
				|  |  | +# contributors may be used to endorse or promote products derived from
 | 
	
		
			
				|  |  | +# this software without specific prior written permission.
 | 
	
		
			
				|  |  | +#
 | 
	
		
			
				|  |  | +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 | 
	
		
			
				|  |  | +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 | 
	
		
			
				|  |  | +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 | 
	
		
			
				|  |  | +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 | 
	
		
			
				|  |  | +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 | 
	
		
			
				|  |  | +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 | 
	
		
			
				|  |  | +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 | 
	
		
			
				|  |  | +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 | 
	
		
			
				|  |  | +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 | 
	
		
			
				|  |  | +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 | 
	
		
			
				|  |  | +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +"""State and behavior for ingestion during an operation."""
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +import abc
 | 
	
		
			
				|  |  | +import collections
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +from grpc.framework.core import _constants
 | 
	
		
			
				|  |  | +from grpc.framework.core import _interfaces
 | 
	
		
			
				|  |  | +from grpc.framework.foundation import abandonment
 | 
	
		
			
				|  |  | +from grpc.framework.foundation import callable_util
 | 
	
		
			
				|  |  | +from grpc.framework.interfaces.base import base
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +_CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE = 'Exception initializing ingestion!'
 | 
	
		
			
				|  |  | +_INGESTION_EXCEPTION_LOG_MESSAGE = 'Exception during ingestion!'
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class _SubscriptionCreation(collections.namedtuple(
 | 
	
		
			
				|  |  | +    '_SubscriptionCreation', ('subscription', 'remote_error', 'abandoned'))):
 | 
	
		
			
				|  |  | +  """A sum type for the outcome of ingestion initialization.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  Either subscription will be non-None, remote_error will be True, or abandoned
 | 
	
		
			
				|  |  | +  will be True.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  Attributes:
 | 
	
		
			
				|  |  | +    subscription: A base.Subscription describing the customer's interest in
 | 
	
		
			
				|  |  | +      operation values from the other side.
 | 
	
		
			
				|  |  | +    remote_error: A boolean indicating that the subscription could not be
 | 
	
		
			
				|  |  | +      created due to an error on the remote side of the operation.
 | 
	
		
			
				|  |  | +    abandoned: A boolean indicating that subscription creation was abandoned.
 | 
	
		
			
				|  |  | +  """
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class _SubscriptionCreator(object):
 | 
	
		
			
				|  |  | +  """Common specification of subscription-creating behavior."""
 | 
	
		
			
				|  |  | +  __metaclass__ = abc.ABCMeta
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  @abc.abstractmethod
 | 
	
		
			
				|  |  | +  def create(self, group, method):
 | 
	
		
			
				|  |  | +    """Creates the base.Subscription of the local customer.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Any exceptions raised by this method should be attributed to and treated as
 | 
	
		
			
				|  |  | +    defects in the customer code called by this method.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Args:
 | 
	
		
			
				|  |  | +      group: The group identifier of the operation.
 | 
	
		
			
				|  |  | +      method: The method identifier of the operation.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Returns:
 | 
	
		
			
				|  |  | +      A _SubscriptionCreation describing the result of subscription creation.
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    raise NotImplementedError()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class _ServiceSubscriptionCreator(_SubscriptionCreator):
 | 
	
		
			
				|  |  | +  """A _SubscriptionCreator appropriate for service-side use."""
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def __init__(self, servicer, operation_context, output_operator):
 | 
	
		
			
				|  |  | +    """Constructor.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Args:
 | 
	
		
			
				|  |  | +      servicer: The base.Servicer that will service the operation.
 | 
	
		
			
				|  |  | +      operation_context: A base.OperationContext for the operation to be passed
 | 
	
		
			
				|  |  | +        to the customer.
 | 
	
		
			
				|  |  | +      output_operator: A base.Operator for the operation to be passed to the
 | 
	
		
			
				|  |  | +        customer and to be called by the customer to accept operation data
 | 
	
		
			
				|  |  | +        emitted by the customer.
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    self._servicer = servicer
 | 
	
		
			
				|  |  | +    self._operation_context = operation_context
 | 
	
		
			
				|  |  | +    self._output_operator = output_operator
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def create(self, group, method):
 | 
	
		
			
				|  |  | +    try:
 | 
	
		
			
				|  |  | +      subscription = self._servicer.service(
 | 
	
		
			
				|  |  | +          group, method, self._operation_context, self._output_operator)
 | 
	
		
			
				|  |  | +    except base.NoSuchMethodError:
 | 
	
		
			
				|  |  | +      return _SubscriptionCreation(None, True, False)
 | 
	
		
			
				|  |  | +    except abandonment.Abandoned:
 | 
	
		
			
				|  |  | +      return _SubscriptionCreation(None, False, True)
 | 
	
		
			
				|  |  | +    else:
 | 
	
		
			
				|  |  | +      return _SubscriptionCreation(subscription, False, False)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def _wrap(behavior):
 | 
	
		
			
				|  |  | +  def wrapped(*args, **kwargs):
 | 
	
		
			
				|  |  | +    try:
 | 
	
		
			
				|  |  | +      behavior(*args, **kwargs)
 | 
	
		
			
				|  |  | +    except abandonment.Abandoned:
 | 
	
		
			
				|  |  | +      return False
 | 
	
		
			
				|  |  | +    else:
 | 
	
		
			
				|  |  | +      return True
 | 
	
		
			
				|  |  | +  return wrapped
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +class _IngestionManager(_interfaces.IngestionManager):
 | 
	
		
			
				|  |  | +  """An implementation of _interfaces.IngestionManager."""
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def __init__(
 | 
	
		
			
				|  |  | +      self, lock, pool, subscription, subscription_creator, termination_manager,
 | 
	
		
			
				|  |  | +      transmission_manager, expiration_manager):
 | 
	
		
			
				|  |  | +    """Constructor.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Args:
 | 
	
		
			
				|  |  | +      lock: The operation-wide lock.
 | 
	
		
			
				|  |  | +      pool: A thread pool in which to execute customer code.
 | 
	
		
			
				|  |  | +      subscription: A base.Subscription describing the customer's interest in
 | 
	
		
			
				|  |  | +        operation values from the other side. May be None if
 | 
	
		
			
				|  |  | +        subscription_creator is not None.
 | 
	
		
			
				|  |  | +      subscription_creator: A _SubscriptionCreator wrapping the portion of
 | 
	
		
			
				|  |  | +        customer code that when called returns the base.Subscription describing
 | 
	
		
			
				|  |  | +        the customer's interest in operation values from the other side. May be
 | 
	
		
			
				|  |  | +        None if subscription is not None.
 | 
	
		
			
				|  |  | +      termination_manager: The _interfaces.TerminationManager for the operation.
 | 
	
		
			
				|  |  | +      transmission_manager: The _interfaces.TransmissionManager for the
 | 
	
		
			
				|  |  | +        operation.
 | 
	
		
			
				|  |  | +      expiration_manager: The _interfaces.ExpirationManager for the operation.
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    self._lock = lock
 | 
	
		
			
				|  |  | +    self._pool = pool
 | 
	
		
			
				|  |  | +    self._termination_manager = termination_manager
 | 
	
		
			
				|  |  | +    self._transmission_manager = transmission_manager
 | 
	
		
			
				|  |  | +    self._expiration_manager = expiration_manager
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    if subscription is None:
 | 
	
		
			
				|  |  | +      self._subscription_creator = subscription_creator
 | 
	
		
			
				|  |  | +      self._wrapped_operator = None
 | 
	
		
			
				|  |  | +    elif subscription.kind is base.Subscription.Kind.FULL:
 | 
	
		
			
				|  |  | +      self._subscription_creator = None
 | 
	
		
			
				|  |  | +      self._wrapped_operator = _wrap(subscription.operator.advance)
 | 
	
		
			
				|  |  | +    else:
 | 
	
		
			
				|  |  | +      # TODO(nathaniel): Support other subscriptions.
 | 
	
		
			
				|  |  | +      raise ValueError('Unsupported subscription "%s"!' % subscription.kind)
 | 
	
		
			
				|  |  | +    self._pending_initial_metadata = None
 | 
	
		
			
				|  |  | +    self._pending_payloads = []
 | 
	
		
			
				|  |  | +    self._pending_completion = None
 | 
	
		
			
				|  |  | +    self._local_allowance = 1
 | 
	
		
			
				|  |  | +    # A nonnegative integer or None, with None indicating that the local
 | 
	
		
			
				|  |  | +    # customer is done emitting anyway so there's no need to bother it by
 | 
	
		
			
				|  |  | +    # informing it that the remote customer has granted it further permission to
 | 
	
		
			
				|  |  | +    # emit.
 | 
	
		
			
				|  |  | +    self._remote_allowance = 0
 | 
	
		
			
				|  |  | +    self._processing = False
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def _abort_internal_only(self):
 | 
	
		
			
				|  |  | +    self._subscription_creator = None
 | 
	
		
			
				|  |  | +    self._wrapped_operator = None
 | 
	
		
			
				|  |  | +    self._pending_initial_metadata = None
 | 
	
		
			
				|  |  | +    self._pending_payloads = None
 | 
	
		
			
				|  |  | +    self._pending_completion = None
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def _abort_and_notify(self, outcome):
 | 
	
		
			
				|  |  | +    self._abort_internal_only()
 | 
	
		
			
				|  |  | +    self._termination_manager.abort(outcome)
 | 
	
		
			
				|  |  | +    self._transmission_manager.abort(outcome)
 | 
	
		
			
				|  |  | +    self._expiration_manager.terminate()
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def _operator_next(self):
 | 
	
		
			
				|  |  | +    """Computes the next step for full-subscription ingestion.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    Returns:
 | 
	
		
			
				|  |  | +      An initial_metadata, payload, completion, allowance, continue quintet
 | 
	
		
			
				|  |  | +        indicating what operation values (if any) are available to pass into
 | 
	
		
			
				|  |  | +        customer code and whether or not there is anything immediately
 | 
	
		
			
				|  |  | +        actionable to call customer code to do.
 | 
	
		
			
				|  |  | +    """
 | 
	
		
			
				|  |  | +    if self._wrapped_operator is None:
 | 
	
		
			
				|  |  | +      return None, None, None, None, False
 | 
	
		
			
				|  |  | +    else:
 | 
	
		
			
				|  |  | +      initial_metadata, payload, completion, allowance, action = [None] * 5
 | 
	
		
			
				|  |  | +      if self._pending_initial_metadata is not None:
 | 
	
		
			
				|  |  | +        initial_metadata = self._pending_initial_metadata
 | 
	
		
			
				|  |  | +        self._pending_initial_metadata = None
 | 
	
		
			
				|  |  | +        action = True
 | 
	
		
			
				|  |  | +      if self._pending_payloads and 0 < self._local_allowance:
 | 
	
		
			
				|  |  | +        payload = self._pending_payloads.pop(0)
 | 
	
		
			
				|  |  | +        self._local_allowance -= 1
 | 
	
		
			
				|  |  | +        action = True
 | 
	
		
			
				|  |  | +      if not self._pending_payloads and self._pending_completion is not None:
 | 
	
		
			
				|  |  | +        completion = self._pending_completion
 | 
	
		
			
				|  |  | +        self._pending_completion = None
 | 
	
		
			
				|  |  | +        action = True
 | 
	
		
			
				|  |  | +      if self._remote_allowance is not None and 0 < self._remote_allowance:
 | 
	
		
			
				|  |  | +        allowance = self._remote_allowance
 | 
	
		
			
				|  |  | +        self._remote_allowance = 0
 | 
	
		
			
				|  |  | +        action = True
 | 
	
		
			
				|  |  | +      return initial_metadata, payload, completion, allowance, bool(action)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def _operator_process(
 | 
	
		
			
				|  |  | +      self, wrapped_operator, initial_metadata, payload,
 | 
	
		
			
				|  |  | +      completion, allowance):
 | 
	
		
			
				|  |  | +    while True:
 | 
	
		
			
				|  |  | +      advance_outcome = callable_util.call_logging_exceptions(
 | 
	
		
			
				|  |  | +          wrapped_operator, _INGESTION_EXCEPTION_LOG_MESSAGE,
 | 
	
		
			
				|  |  | +          initial_metadata=initial_metadata, payload=payload,
 | 
	
		
			
				|  |  | +          completion=completion, allowance=allowance)
 | 
	
		
			
				|  |  | +      if advance_outcome.exception is None:
 | 
	
		
			
				|  |  | +        if advance_outcome.return_value:
 | 
	
		
			
				|  |  | +          with self._lock:
 | 
	
		
			
				|  |  | +            if self._termination_manager.outcome is not None:
 | 
	
		
			
				|  |  | +              return
 | 
	
		
			
				|  |  | +            if completion is not None:
 | 
	
		
			
				|  |  | +              self._termination_manager.ingestion_complete()
 | 
	
		
			
				|  |  | +            initial_metadata, payload, completion, allowance, moar = (
 | 
	
		
			
				|  |  | +                self._operator_next())
 | 
	
		
			
				|  |  | +            if not moar:
 | 
	
		
			
				|  |  | +              self._processing = False
 | 
	
		
			
				|  |  | +              return
 | 
	
		
			
				|  |  | +        else:
 | 
	
		
			
				|  |  | +          with self._lock:
 | 
	
		
			
				|  |  | +            if self._termination_manager.outcome is None:
 | 
	
		
			
				|  |  | +              self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
 | 
	
		
			
				|  |  | +            return
 | 
	
		
			
				|  |  | +      else:
 | 
	
		
			
				|  |  | +        with self._lock:
 | 
	
		
			
				|  |  | +          if self._termination_manager.outcome is None:
 | 
	
		
			
				|  |  | +            self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
 | 
	
		
			
				|  |  | +          return
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def _operator_post_create(self, subscription):
 | 
	
		
			
				|  |  | +    wrapped_operator = _wrap(subscription.operator.advance)
 | 
	
		
			
				|  |  | +    with self._lock:
 | 
	
		
			
				|  |  | +      if self._termination_manager.outcome is not None:
 | 
	
		
			
				|  |  | +        return
 | 
	
		
			
				|  |  | +      self._wrapped_operator = wrapped_operator
 | 
	
		
			
				|  |  | +      self._subscription_creator = None
 | 
	
		
			
				|  |  | +      metadata, payload, completion, allowance, moar = self._operator_next()
 | 
	
		
			
				|  |  | +      if not moar:
 | 
	
		
			
				|  |  | +        self._processing = False
 | 
	
		
			
				|  |  | +        return
 | 
	
		
			
				|  |  | +    self._operator_process(
 | 
	
		
			
				|  |  | +        wrapped_operator, metadata, payload, completion, allowance)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def _create(self, subscription_creator, group, name):
 | 
	
		
			
				|  |  | +    outcome = callable_util.call_logging_exceptions(
 | 
	
		
			
				|  |  | +        subscription_creator.create, _CREATE_SUBSCRIPTION_EXCEPTION_LOG_MESSAGE,
 | 
	
		
			
				|  |  | +        group, name)
 | 
	
		
			
				|  |  | +    if outcome.return_value is None:
 | 
	
		
			
				|  |  | +      with self._lock:
 | 
	
		
			
				|  |  | +        if self._termination_manager.outcome is None:
 | 
	
		
			
				|  |  | +          self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
 | 
	
		
			
				|  |  | +    elif outcome.return_value.abandoned:
 | 
	
		
			
				|  |  | +      with self._lock:
 | 
	
		
			
				|  |  | +        if self._termination_manager.outcome is None:
 | 
	
		
			
				|  |  | +          self._abort_and_notify(base.Outcome.LOCAL_FAILURE)
 | 
	
		
			
				|  |  | +    elif outcome.return_value.remote_error:
 | 
	
		
			
				|  |  | +      with self._lock:
 | 
	
		
			
				|  |  | +        if self._termination_manager.outcome is None:
 | 
	
		
			
				|  |  | +          self._abort_and_notify(base.Outcome.REMOTE_FAILURE)
 | 
	
		
			
				|  |  | +    elif outcome.return_value.subscription.kind is base.Subscription.Kind.FULL:
 | 
	
		
			
				|  |  | +      self._operator_post_create(outcome.return_value.subscription)
 | 
	
		
			
				|  |  | +    else:
 | 
	
		
			
				|  |  | +      # TODO(nathaniel): Support other subscriptions.
 | 
	
		
			
				|  |  | +      raise ValueError(
 | 
	
		
			
				|  |  | +          'Unsupported "%s"!' % outcome.return_value.subscription.kind)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def _store_advance(self, initial_metadata, payload, completion, allowance):
 | 
	
		
			
				|  |  | +    if initial_metadata is not None:
 | 
	
		
			
				|  |  | +      self._pending_initial_metadata = initial_metadata
 | 
	
		
			
				|  |  | +    if payload is not None:
 | 
	
		
			
				|  |  | +      self._pending_payloads.append(payload)
 | 
	
		
			
				|  |  | +    if completion is not None:
 | 
	
		
			
				|  |  | +      self._pending_completion = completion
 | 
	
		
			
				|  |  | +    if allowance is not None and self._remote_allowance is not None:
 | 
	
		
			
				|  |  | +      self._remote_allowance += allowance
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def _operator_advance(self, initial_metadata, payload, completion, allowance):
 | 
	
		
			
				|  |  | +    if self._processing:
 | 
	
		
			
				|  |  | +      self._store_advance(initial_metadata, payload, completion, allowance)
 | 
	
		
			
				|  |  | +    else:
 | 
	
		
			
				|  |  | +      action = False
 | 
	
		
			
				|  |  | +      if initial_metadata is not None:
 | 
	
		
			
				|  |  | +        action = True
 | 
	
		
			
				|  |  | +      if payload is not None:
 | 
	
		
			
				|  |  | +        if 0 < self._local_allowance:
 | 
	
		
			
				|  |  | +          self._local_allowance -= 1
 | 
	
		
			
				|  |  | +          action = True
 | 
	
		
			
				|  |  | +        else:
 | 
	
		
			
				|  |  | +          self._pending_payloads.append(payload)
 | 
	
		
			
				|  |  | +          payload = False
 | 
	
		
			
				|  |  | +      if completion is not None:
 | 
	
		
			
				|  |  | +        if self._pending_payloads:
 | 
	
		
			
				|  |  | +          self._pending_completion = completion
 | 
	
		
			
				|  |  | +        else:
 | 
	
		
			
				|  |  | +          action = True
 | 
	
		
			
				|  |  | +      if allowance is not None and self._remote_allowance is not None:
 | 
	
		
			
				|  |  | +        allowance += self._remote_allowance
 | 
	
		
			
				|  |  | +        self._remote_allowance = 0
 | 
	
		
			
				|  |  | +        action = True
 | 
	
		
			
				|  |  | +      if action:
 | 
	
		
			
				|  |  | +        self._pool.submit(
 | 
	
		
			
				|  |  | +            callable_util.with_exceptions_logged(
 | 
	
		
			
				|  |  | +                self._operator_process, _constants.INTERNAL_ERROR_LOG_MESSAGE),
 | 
	
		
			
				|  |  | +            self._wrapped_operator, initial_metadata, payload, completion,
 | 
	
		
			
				|  |  | +            allowance)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def set_group_and_method(self, group, method):
 | 
	
		
			
				|  |  | +    """See _interfaces.IngestionManager.set_group_and_method for spec."""
 | 
	
		
			
				|  |  | +    if self._subscription_creator is not None and not self._processing:
 | 
	
		
			
				|  |  | +      self._pool.submit(
 | 
	
		
			
				|  |  | +          callable_util.with_exceptions_logged(
 | 
	
		
			
				|  |  | +              self._create, _constants.INTERNAL_ERROR_LOG_MESSAGE),
 | 
	
		
			
				|  |  | +          self._subscription_creator, group, method)
 | 
	
		
			
				|  |  | +      self._processing = True
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def add_local_allowance(self, allowance):
 | 
	
		
			
				|  |  | +    """See _interfaces.IngestionManager.add_local_allowance for spec."""
 | 
	
		
			
				|  |  | +    if any((self._subscription_creator, self._wrapped_operator,)):
 | 
	
		
			
				|  |  | +      self._local_allowance += allowance
 | 
	
		
			
				|  |  | +      if not self._processing:
 | 
	
		
			
				|  |  | +        initial_metadata, payload, completion, allowance, moar = (
 | 
	
		
			
				|  |  | +            self._operator_next())
 | 
	
		
			
				|  |  | +        if moar:
 | 
	
		
			
				|  |  | +          self._pool.submit(
 | 
	
		
			
				|  |  | +              callable_util.with_exceptions_logged(
 | 
	
		
			
				|  |  | +                  self._operator_process,
 | 
	
		
			
				|  |  | +                  _constants.INTERNAL_ERROR_LOG_MESSAGE),
 | 
	
		
			
				|  |  | +              initial_metadata, payload, completion, allowance)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def local_emissions_done(self):
 | 
	
		
			
				|  |  | +    self._remote_allowance = None
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  def advance(self, initial_metadata, payload, completion, allowance):
 | 
	
		
			
				|  |  | +    """See _interfaces.IngestionManager.advance for specification."""
 | 
	
		
			
				|  |  | +    if self._subscription_creator is not None:
 | 
	
		
			
				|  |  | +      self._store_advance(initial_metadata, payload, completion, allowance)
 | 
	
		
			
				|  |  | +    elif self._wrapped_operator is not None:
 | 
	
		
			
				|  |  | +      self._operator_advance(initial_metadata, payload, completion, allowance)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def invocation_ingestion_manager(
 | 
	
		
			
				|  |  | +    subscription, lock, pool, termination_manager, transmission_manager,
 | 
	
		
			
				|  |  | +    expiration_manager):
 | 
	
		
			
				|  |  | +  """Creates an IngestionManager appropriate for invocation-side use.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  Args:
 | 
	
		
			
				|  |  | +    subscription: A base.Subscription indicating the customer's interest in the
 | 
	
		
			
				|  |  | +      data and results from the service-side of the operation.
 | 
	
		
			
				|  |  | +    lock: The operation-wide lock.
 | 
	
		
			
				|  |  | +    pool: A thread pool in which to execute customer code.
 | 
	
		
			
				|  |  | +    termination_manager: The _interfaces.TerminationManager for the operation.
 | 
	
		
			
				|  |  | +    transmission_manager: The _interfaces.TransmissionManager for the
 | 
	
		
			
				|  |  | +      operation.
 | 
	
		
			
				|  |  | +    expiration_manager: The _interfaces.ExpirationManager for the operation.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  Returns:
 | 
	
		
			
				|  |  | +    An IngestionManager appropriate for invocation-side use.
 | 
	
		
			
				|  |  | +  """
 | 
	
		
			
				|  |  | +  return _IngestionManager(
 | 
	
		
			
				|  |  | +      lock, pool, subscription, None, termination_manager, transmission_manager,
 | 
	
		
			
				|  |  | +      expiration_manager)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +def service_ingestion_manager(
 | 
	
		
			
				|  |  | +    servicer, operation_context, output_operator, lock, pool,
 | 
	
		
			
				|  |  | +    termination_manager, transmission_manager, expiration_manager):
 | 
	
		
			
				|  |  | +  """Creates an IngestionManager appropriate for service-side use.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  The returned IngestionManager will require its set_group_and_name method to be
 | 
	
		
			
				|  |  | +  called before its advance method may be called.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  Args:
 | 
	
		
			
				|  |  | +    servicer: A base.Servicer for servicing the operation.
 | 
	
		
			
				|  |  | +    operation_context: A base.OperationContext for the operation to be passed to
 | 
	
		
			
				|  |  | +      the customer.
 | 
	
		
			
				|  |  | +    output_operator: A base.Operator for the operation to be passed to the
 | 
	
		
			
				|  |  | +      customer and to be called by the customer to accept operation data output
 | 
	
		
			
				|  |  | +      by the customer.
 | 
	
		
			
				|  |  | +    lock: The operation-wide lock.
 | 
	
		
			
				|  |  | +    pool: A thread pool in which to execute customer code.
 | 
	
		
			
				|  |  | +    termination_manager: The _interfaces.TerminationManager for the operation.
 | 
	
		
			
				|  |  | +    transmission_manager: The _interfaces.TransmissionManager for the
 | 
	
		
			
				|  |  | +      operation.
 | 
	
		
			
				|  |  | +    expiration_manager: The _interfaces.ExpirationManager for the operation.
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +  Returns:
 | 
	
		
			
				|  |  | +    An IngestionManager appropriate for service-side use.
 | 
	
		
			
				|  |  | +  """
 | 
	
		
			
				|  |  | +  subscription_creator = _ServiceSubscriptionCreator(
 | 
	
		
			
				|  |  | +      servicer, operation_context, output_operator)
 | 
	
		
			
				|  |  | +  return _IngestionManager(
 | 
	
		
			
				|  |  | +      lock, pool, None, subscription_creator, termination_manager,
 | 
	
		
			
				|  |  | +      transmission_manager, expiration_manager)
 |