Source code for kazoo.recipe.partitioner

"""Zookeeper Partitioner Implementation

:Maintainer: None
:Status: Unknown

:class:`SetPartitioner` implements a partitioning scheme using
Zookeeper for dividing up resources amongst members of a party.

This is useful when there is a set of resources that should only be
accessed by a single process at a time that multiple processes
across a cluster might want to divide up.

Example Use-Case
----------------

- Multiple workers across a cluster need to divide up a list of queues
  so that no two workers own the same queue.

"""
import logging
import os
import socket
from functools import partial

from kazoo.exceptions import KazooException
from kazoo.protocol.states import KazooState
from kazoo.recipe.watchers import PatientChildrenWatch

log = logging.getLogger(__name__)


[docs]class PartitionState(object): """High level partition state values .. attribute:: ALLOCATING The set needs to be partitioned, and may require an existing partition set to be released before acquiring a new partition of the set. .. attribute:: ACQUIRED The set has been partitioned and acquired. .. attribute:: RELEASE The set needs to be repartitioned, and the current partitions must be released before a new allocation can be made. .. attribute:: FAILURE The set partition has failed. This occurs when the maximum time to partition the set is exceeded or the Zookeeper session is lost. The partitioner is unusable after this state and must be recreated. """ ALLOCATING = "ALLOCATING" ACQUIRED = "ACQUIRED" RELEASE = "RELEASE" FAILURE = "FAILURE"
[docs]class SetPartitioner(object): """Partitions a set amongst members of a party This class will partition a set amongst members of a party such that each member will be given zero or more items of the set and each set item will be given to a single member. When new members enter or leave the party, the set will be re-partitioned amongst the members. When the :class:`SetPartitioner` enters the :attr:`~PartitionState.FAILURE` state, it is unrecoverable and a new :class:`SetPartitioner` should be created. Example: .. code-block:: python from kazoo.client import KazooClient client = KazooClient() qp = client.SetPartitioner( path='/work_queues', set=('queue-1', 'queue-2', 'queue-3')) while 1: if qp.failed: raise Exception("Lost or unable to acquire partition") elif qp.release: qp.release_set() elif qp.acquired: for partition in qp: # Do something with each partition elif qp.allocating: qp.wait_for_acquire() **State Transitions** When created, the :class:`SetPartitioner` enters the :attr:`PartitionState.ALLOCATING` state. :attr:`~PartitionState.ALLOCATING` -> :attr:`~PartitionState.ACQUIRED` Set was partitioned successfully, the partition list assigned is accessible via list/iter methods or calling list() on the :class:`SetPartitioner` instance. :attr:`~PartitionState.ALLOCATING` -> :attr:`~PartitionState.FAILURE` Allocating the set failed either due to a Zookeeper session expiration, or failure to acquire the items of the set within the timeout period. :attr:`~PartitionState.ACQUIRED` -> :attr:`~PartitionState.RELEASE` The members of the party have changed, and the set needs to be repartitioned. :meth:`SetPartitioner.release` should be called as soon as possible. :attr:`~PartitionState.ACQUIRED` -> :attr:`~PartitionState.FAILURE` The current partition was lost due to a Zookeeper session expiration. :attr:`~PartitionState.RELEASE` -> :attr:`~PartitionState.ALLOCATING` The current partition was released and is being re-allocated. """
[docs] def __init__(self, client, path, set, partition_func=None, identifier=None, time_boundary=30): """Create a :class:`~SetPartitioner` instance :param client: A :class:`~kazoo.client.KazooClient` instance. :param path: The partition path to use. :param set: The set of items to partition. :param partition_func: A function to use to decide how to partition the set. :param identifier: An identifier to use for this member of the party when participating. Defaults to the hostname + process id. :param time_boundary: How long the party members must be stable before allocation can complete. """ self.state = PartitionState.ALLOCATING self._client = client self._path = path self._set = set self._partition_set = [] self._partition_func = partition_func or self._partitioner self._identifier = identifier or '%s-%s' % ( socket.getfqdn(), os.getpid()) self._locks = [] self._lock_path = '/'.join([path, 'locks']) self._party_path = '/'.join([path, 'party']) self._time_boundary = time_boundary self._acquire_event = client.handler.event_object() # Create basic path nodes client.ensure_path(path) client.ensure_path(self._lock_path) client.ensure_path(self._party_path) # Join the party self._party = client.ShallowParty(self._party_path, identifier=self._identifier) self._party.join() self._was_allocated = False self._state_change = client.handler.rlock_object() client.add_listener(self._establish_sessionwatch) # Now watch the party and set the callback on the async result # so we know when we're ready self._children_updated = False self._child_watching(self._allocate_transition, async=True)
def __iter__(self): """Return the partitions in this partition set""" for partition in self._partition_set: yield partition @property
[docs] def failed(self): """Corresponds to the :attr:`PartitionState.FAILURE` state""" return self.state == PartitionState.FAILURE
@property
[docs] def release(self): """Corresponds to the :attr:`PartitionState.RELEASE` state""" return self.state == PartitionState.RELEASE
@property
[docs] def allocating(self): """Corresponds to the :attr:`PartitionState.ALLOCATING` state""" return self.state == PartitionState.ALLOCATING
@property
[docs] def acquired(self): """Corresponds to the :attr:`PartitionState.ACQUIRED` state""" return self.state == PartitionState.ACQUIRED
[docs] def wait_for_acquire(self, timeout=30): """Wait for the set to be partitioned and acquired :param timeout: How long to wait before returning. :type timeout: int """ self._acquire_event.wait(timeout)
[docs] def release_set(self): """Call to release the set This method begins the step of allocating once the set has been released. """ self._release_locks() if self._locks: # pragma: nocover # This shouldn't happen, it means we couldn't release our # locks, abort self._fail_out() return else: with self._state_change: if self.failed: return self.state = PartitionState.ALLOCATING self._child_watching(self._allocate_transition, async=True)
[docs] def finish(self): """Call to release the set and leave the party""" self._release_locks() self._fail_out()
def _fail_out(self): with self._state_change: self.state = PartitionState.FAILURE if self._party.participating: try: self._party.leave() except KazooException: # pragma: nocover pass def _allocate_transition(self, result): """Called when in allocating mode, and the children settled""" # Did we get an exception waiting for children to settle? if result.exception: # pragma: nocover self._fail_out() return children, async_result = result.get() self._children_updated = False # Add a callback when children change on the async_result def updated(result): with self._state_change: if self.acquired: self.state = PartitionState.RELEASE self._children_updated = True async_result.rawlink(updated) # Split up the set self._partition_set = self._partition_func( self._identifier, list(self._party), self._set) # Proceed to acquire locks for the working set as needed for member in self._partition_set: if self._children_updated or self.failed: # Still haven't settled down, release locks acquired # so far and go back return self._abort_lock_acquisition() lock = self._client.Lock(self._lock_path + '/' + str(member)) try: lock.acquire() except KazooException: # pragma: nocover return self.finish() self._locks.append(lock) # All locks acquired! Time for state transition, make sure # we didn't inadvertently get lost thus far with self._state_change: if self.failed: # pragma: nocover return self.finish() self.state = PartitionState.ACQUIRED self._acquire_event.set() def _release_locks(self): """Attempt to completely remove all the locks""" self._acquire_event.clear() for lock in self._locks[:]: try: lock.release() except KazooException: # pragma: nocover # We proceed to remove as many as possible, and leave # the ones we couldn't remove pass else: self._locks.remove(lock) def _abort_lock_acquisition(self): """Called during lock acquisition if a party change occurs""" self._partition_set = [] self._release_locks() if self._locks: # This shouldn't happen, it means we couldn't release our # locks, abort self._fail_out() return return self._child_watching(self._allocate_transition) def _child_watching(self, func=None, async=False): """Called when children are being watched to stabilize This actually returns immediately, child watcher spins up a new thread/greenlet and waits for it to stabilize before any callbacks might run. """ watcher = PatientChildrenWatch(self._client, self._party_path, self._time_boundary) asy = watcher.start() if func is not None: # We spin up the function in a separate thread/greenlet # to ensure that the rawlink's it might use won't be # blocked if async: func = partial(self._client.handler.spawn, func) asy.rawlink(func) return asy def _establish_sessionwatch(self, state): """Register ourself to listen for session events, we shut down if we become lost""" with self._state_change: # Handle network partition: If connection gets suspended, # change state to ALLOCATING if we had already ACQUIRED. This way # the caller does not process the members since we could eventually # lose session get repartitioned. If we got connected after a suspension # it means we've not lost the session and still have our members. Hence, # restore to ACQUIRED if state == KazooState.SUSPENDED: if self.state == PartitionState.ACQUIRED: self._was_allocated = True self.state = PartitionState.ALLOCATING elif state == KazooState.CONNECTED: if self._was_allocated: self._was_allocated = False self.state = PartitionState.ACQUIRED if state == KazooState.LOST: self._client.handler.spawn(self._fail_out) return True def _partitioner(self, identifier, members, partitions): # Ensure consistent order of partitions/members all_partitions = sorted(partitions) workers = sorted(members) i = workers.index(identifier) # Now return the partition list starting at our location and # skipping the other workers return all_partitions[i::len(workers)]