"""Zookeeper Partitioner Implementation
:Maintainer: None
:Status: Unknown
:class:`SetPartitioner` implements a partitioning scheme using
Zookeeper for dividing up resources amongst members of a party.
This is useful when there is a set of resources that should only be
accessed by a single process at a time that multiple processes
across a cluster might want to divide up.
Example Use-Case
----------------
- Multiple workers across a cluster need to divide up a list of queues
so that no two workers own the same queue.
"""
import logging
import os
import socket
from functools import partial
from kazoo.exceptions import KazooException, LockTimeout
from kazoo.protocol.states import KazooState
from kazoo.recipe.watchers import PatientChildrenWatch
log = logging.getLogger(__name__)
[docs]class PartitionState(object):
"""High level partition state values
.. attribute:: ALLOCATING
The set needs to be partitioned, and may require an existing
partition set to be released before acquiring a new partition
of the set.
.. attribute:: ACQUIRED
The set has been partitioned and acquired.
.. attribute:: RELEASE
The set needs to be repartitioned, and the current partitions
must be released before a new allocation can be made.
.. attribute:: FAILURE
The set partition has failed. This occurs when the maximum
time to partition the set is exceeded or the Zookeeper session
is lost. The partitioner is unusable after this state and must
be recreated.
"""
ALLOCATING = "ALLOCATING"
ACQUIRED = "ACQUIRED"
RELEASE = "RELEASE"
FAILURE = "FAILURE"
[docs]class SetPartitioner(object):
"""Partitions a set amongst members of a party
This class will partition a set amongst members of a party such
that each member will be given zero or more items of the set and
each set item will be given to a single member. When new members
enter or leave the party, the set will be re-partitioned amongst
the members.
When the :class:`SetPartitioner` enters the
:attr:`~PartitionState.FAILURE` state, it is unrecoverable
and a new :class:`SetPartitioner` should be created.
Example:
.. code-block:: python
from kazoo.client import KazooClient
client = KazooClient()
qp = client.SetPartitioner(
path='/work_queues', set=('queue-1', 'queue-2', 'queue-3'))
while 1:
if qp.failed:
raise Exception("Lost or unable to acquire partition")
elif qp.release:
qp.release_set()
elif qp.acquired:
for partition in qp:
# Do something with each partition
elif qp.allocating:
qp.wait_for_acquire()
**State Transitions**
When created, the :class:`SetPartitioner` enters the
:attr:`PartitionState.ALLOCATING` state.
:attr:`~PartitionState.ALLOCATING` ->
:attr:`~PartitionState.ACQUIRED`
Set was partitioned successfully, the partition list assigned
is accessible via list/iter methods or calling list() on the
:class:`SetPartitioner` instance.
:attr:`~PartitionState.ALLOCATING` ->
:attr:`~PartitionState.FAILURE`
Allocating the set failed either due to a Zookeeper session
expiration, or failure to acquire the items of the set within
the timeout period.
:attr:`~PartitionState.ACQUIRED` ->
:attr:`~PartitionState.RELEASE`
The members of the party have changed, and the set needs to be
repartitioned. :meth:`SetPartitioner.release` should be called
as soon as possible.
:attr:`~PartitionState.ACQUIRED` ->
:attr:`~PartitionState.FAILURE`
The current partition was lost due to a Zookeeper session
expiration.
:attr:`~PartitionState.RELEASE` ->
:attr:`~PartitionState.ALLOCATING`
The current partition was released and is being re-allocated.
"""
[docs] def __init__(self, client, path, set, partition_func=None,
identifier=None, time_boundary=30, max_reaction_time=1,
state_change_event=None):
"""Create a :class:`~SetPartitioner` instance
:param client: A :class:`~kazoo.client.KazooClient` instance.
:param path: The partition path to use.
:param set: The set of items to partition.
:param partition_func: A function to use to decide how to
partition the set.
:param identifier: An identifier to use for this member of the
party when participating. Defaults to the
hostname + process id.
:param time_boundary: How long the party members must be stable
before allocation can complete.
:param max_reaction_time: Maximum reaction time for party members
change.
:param state_change_event: An optional Event object that will be set
on every state change.
"""
# Used to differentiate two states with the same names in time
self.state_id = 0
self.state = PartitionState.ALLOCATING
self.state_change_event = state_change_event or \
client.handler.event_object()
self._client = client
self._path = path
self._set = set
self._partition_set = []
self._partition_func = partition_func or self._partitioner
self._identifier = identifier or '%s-%s' % (
socket.getfqdn(), os.getpid())
self._locks = []
self._lock_path = '/'.join([path, 'locks'])
self._party_path = '/'.join([path, 'party'])
self._time_boundary = time_boundary
self._max_reaction_time = max_reaction_time
self._acquire_event = client.handler.event_object()
# Create basic path nodes
client.ensure_path(path)
client.ensure_path(self._lock_path)
client.ensure_path(self._party_path)
# Join the party
self._party = client.ShallowParty(self._party_path,
identifier=self._identifier)
self._party.join()
self._state_change = client.handler.rlock_object()
client.add_listener(self._establish_sessionwatch)
# Now watch the party and set the callback on the async result
# so we know when we're ready
self._child_watching(self._allocate_transition, async=True)
def __iter__(self):
"""Return the partitions in this partition set"""
for partition in self._partition_set:
yield partition
@property
[docs] def failed(self):
"""Corresponds to the :attr:`PartitionState.FAILURE` state"""
return self.state == PartitionState.FAILURE
@property
[docs] def release(self):
"""Corresponds to the :attr:`PartitionState.RELEASE` state"""
return self.state == PartitionState.RELEASE
@property
[docs] def allocating(self):
"""Corresponds to the :attr:`PartitionState.ALLOCATING`
state"""
return self.state == PartitionState.ALLOCATING
@property
[docs] def acquired(self):
"""Corresponds to the :attr:`PartitionState.ACQUIRED` state"""
return self.state == PartitionState.ACQUIRED
[docs] def wait_for_acquire(self, timeout=30):
"""Wait for the set to be partitioned and acquired
:param timeout: How long to wait before returning.
:type timeout: int
"""
self._acquire_event.wait(timeout)
[docs] def release_set(self):
"""Call to release the set
This method begins the step of allocating once the set has
been released.
"""
self._release_locks()
if self._locks: # pragma: nocover
# This shouldn't happen, it means we couldn't release our
# locks, abort
self._fail_out()
return
else:
with self._state_change:
if self.failed:
return
self._set_state(PartitionState.ALLOCATING)
self._child_watching(self._allocate_transition, async=True)
[docs] def finish(self):
"""Call to release the set and leave the party"""
self._release_locks()
self._fail_out()
def _fail_out(self):
with self._state_change:
self._set_state(PartitionState.FAILURE)
if self._party.participating:
try:
self._party.leave()
except KazooException: # pragma: nocover
pass
def _allocate_transition(self, result):
"""Called when in allocating mode, and the children settled"""
# Did we get an exception waiting for children to settle?
if result.exception: # pragma: nocover
self._fail_out()
return
children, async_result = result.get()
children_changed = self._client.handler.event_object()
def updated(result):
with self._state_change:
children_changed.set()
if self.acquired:
self._set_state(PartitionState.RELEASE)
with self._state_change:
# We can lose connection during processing the event
if not self.allocating:
return
# Remember the state ID to check later for race conditions
state_id = self.state_id
# updated() will be called when children change
async_result.rawlink(updated)
# Check whether the state has changed during the lock acquisition
# and abort the process if so.
def abort_if_needed():
if self.state_id == state_id:
if children_changed.is_set():
# The party has changed. Repartitioning...
self._abort_lock_acquisition()
return True
else:
return False
else:
if self.allocating or self.acquired:
# The connection was lost and user initiated a new
# allocation process. Abort it to eliminate race
# conditions with locks.
with self._state_change:
self._set_state(PartitionState.RELEASE)
return True
# Split up the set
partition_set = self._partition_func(
self._identifier, list(self._party), self._set)
# Proceed to acquire locks for the working set as needed
for member in partition_set:
lock = self._client.Lock(self._lock_path + '/' + str(member))
while True:
try:
# We mustn't lock without timeout because in that case we
# can get a deadlock if the party state will change during
# lock acquisition.
lock.acquire(timeout=self._max_reaction_time)
except LockTimeout:
if abort_if_needed():
return
except KazooException:
return self.finish()
else:
break
self._locks.append(lock)
if abort_if_needed():
return
# All locks acquired. Time for state transition.
with self._state_change:
if self.state_id == state_id and not children_changed.is_set():
self._partition_set = partition_set
self._set_state(PartitionState.ACQUIRED)
self._acquire_event.set()
return
if not abort_if_needed():
# This mustn't happen. Means a logical error.
self._fail_out()
def _release_locks(self):
"""Attempt to completely remove all the locks"""
self._acquire_event.clear()
for lock in self._locks[:]:
try:
lock.release()
except KazooException: # pragma: nocover
# We proceed to remove as many as possible, and leave
# the ones we couldn't remove
pass
else:
self._locks.remove(lock)
def _abort_lock_acquisition(self):
"""Called during lock acquisition if a party change occurs"""
self._release_locks()
if self._locks:
# This shouldn't happen, it means we couldn't release our
# locks, abort
self._fail_out()
return
self._child_watching(self._allocate_transition, async=True)
def _child_watching(self, func=None, async=False):
"""Called when children are being watched to stabilize
This actually returns immediately, child watcher spins up a
new thread/greenlet and waits for it to stabilize before
any callbacks might run.
"""
watcher = PatientChildrenWatch(self._client, self._party_path,
self._time_boundary)
asy = watcher.start()
if func is not None:
# We spin up the function in a separate thread/greenlet
# to ensure that the rawlink's it might use won't be
# blocked
if async:
func = partial(self._client.handler.spawn, func)
asy.rawlink(func)
return asy
def _establish_sessionwatch(self, state):
"""Register ourself to listen for session events, we shut down
if we become lost"""
with self._state_change:
if self.failed:
pass
elif state == KazooState.LOST:
self._client.handler.spawn(self._fail_out)
elif not self.release:
self._set_state(PartitionState.RELEASE)
return state == KazooState.LOST
def _partitioner(self, identifier, members, partitions):
# Ensure consistent order of partitions/members
all_partitions = sorted(partitions)
workers = sorted(members)
i = workers.index(identifier)
# Now return the partition list starting at our location and
# skipping the other workers
return all_partitions[i::len(workers)]
def _set_state(self, state):
self.state = state
self.state_id += 1
self.state_change_event.set()