Source code for kazoo.recipe.queue

"""Zookeeper based queue implementations.

:Maintainer: None
:Status: Unknown

"""

import uuid
from kazoo.exceptions import NoNodeError, NodeExistsError
from kazoo.retry import ForceRetryError
from kazoo.protocol.states import EventType


class BaseQueue(object):
    """A common base class for queue implementations."""

    def __init__(self, client, path):
        """
        :param client: A :class:`~kazoo.client.KazooClient` instance.
        :param path: The queue path to use in ZooKeeper.
        """
        self.client = client
        self.path = path
        self._entries_path = path
        self.structure_paths = (self.path, )
        self.ensured_path = False

    def _check_put_arguments(self, value, priority=100):
        if not isinstance(value, bytes):
            raise TypeError("value must be a byte string")
        if not isinstance(priority, int):
            raise TypeError("priority must be an int")
        elif priority < 0 or priority > 999:
            raise ValueError("priority must be between 0 and 999")

    def _ensure_paths(self):
        if not self.ensured_path:
            # make sure our parent / internal structure nodes exists
            for path in self.structure_paths:
                self.client.ensure_path(path)
            self.ensured_path = True

    def __len__(self):
        self._ensure_paths()
        _, stat = self.client.retry(self.client.get, self._entries_path)
        return stat.children_count


[docs]class Queue(BaseQueue): """A distributed queue with optional priority support. This queue does not offer reliable consumption. An entry is removed from the queue prior to being processed. So if an error occurs, the consumer has to re-queue the item or it will be lost. """ prefix = "entry-"
[docs] def __init__(self, client, path): """ :param client: A :class:`~kazoo.client.KazooClient` instance. :param path: The queue path to use in ZooKeeper. """ super(Queue, self).__init__(client, path) self._children = []
[docs] def __len__(self): """Return queue size.""" return super(Queue, self).__len__()
[docs] def get(self): """ Get item data and remove an item from the queue. :returns: Item data or None. :rtype: bytes """ self._ensure_paths() return self.client.retry(self._inner_get)
def _inner_get(self): if not self._children: self._children = self.client.retry(self.client.get_children, self.path) self._children = sorted(self._children) if not self._children: return None name = self._children[0] try: data, stat = self.client.get(self.path + "/" + name) except NoNodeError: # pragma: nocover # the first node has vanished in the meantime, try to # get another one raise ForceRetryError() try: self.client.delete(self.path + "/" + name) except NoNodeError: # pragma: nocover # we were able to get the data but someone else has removed # the node in the meantime. consider the item as processed # by the other process raise ForceRetryError() self._children.pop(0) return data
[docs] def put(self, value, priority=100): """Put an item into the queue. :param value: Byte string to put into the queue. :param priority: An optional priority as an integer with at most 3 digits. Lower values signify higher priority. """ self._check_put_arguments(value, priority) self._ensure_paths() path = '{path}/{prefix}{priority:03d}-'.format( path=self.path, prefix=self.prefix, priority=priority) self.client.create(path, value, sequence=True)
[docs]class LockingQueue(BaseQueue): """A distributed queue with priority and locking support. Upon retrieving an entry from the queue, the entry gets locked with an ephemeral node (instead of deleted). If an error occurs, this lock gets released so that others could retake the entry. This adds a little penalty as compared to :class:`Queue` implementation. The user should call the :meth:`LockingQueue.get` method first to lock and retrieve the next entry. When finished processing the entry, a user should call the :meth:`LockingQueue.consume` method that will remove the entry from the queue. This queue will not track connection status with ZooKeeper. If a node locks an element, then loses connection with ZooKeeper and later reconnects, the lock will probably be removed by Zookeeper in the meantime, but a node would still think that it holds a lock. The user should check the connection status with Zookeeper or call :meth:`LockingQueue.holds_lock` method that will check if a node still holds the lock. .. note:: :class:`LockingQueue` requires ZooKeeper 3.4 or above, since it is using transactions. """ lock = "/taken" entries = "/entries" entry = "entry"
[docs] def __init__(self, client, path): """ :param client: A :class:`~kazoo.client.KazooClient` instance. :param path: The queue path to use in ZooKeeper. """ super(LockingQueue, self).__init__(client, path) self.id = uuid.uuid4().hex.encode() self.processing_element = None self._lock_path = self.path + self.lock self._entries_path = self.path + self.entries self.structure_paths = (self._lock_path, self._entries_path)
[docs] def __len__(self): """Returns the current length of the queue. :returns: queue size (includes locked entries count). """ return super(LockingQueue, self).__len__()
[docs] def put(self, value, priority=100): """Put an entry into the queue. :param value: Byte string to put into the queue. :param priority: An optional priority as an integer with at most 3 digits. Lower values signify higher priority. """ self._check_put_arguments(value, priority) self._ensure_paths() self.client.create( "{path}/{prefix}-{priority:03d}-".format( path=self._entries_path, prefix=self.entry, priority=priority), value, sequence=True)
[docs] def put_all(self, values, priority=100): """Put several entries into the queue. The action only succeeds if all entries where put into the queue. :param values: A list of values to put into the queue. :param priority: An optional priority as an integer with at most 3 digits. Lower values signify higher priority. """ if not isinstance(values, list): raise TypeError("values must be a list of byte strings") if not isinstance(priority, int): raise TypeError("priority must be an int") elif priority < 0 or priority > 999: raise ValueError("priority must be between 0 and 999") self._ensure_paths() with self.client.transaction() as transaction: for value in values: if not isinstance(value, bytes): raise TypeError("value must be a byte string") transaction.create( "{path}/{prefix}-{priority:03d}-".format( path=self._entries_path, prefix=self.entry, priority=priority), value, sequence=True)
[docs] def get(self, timeout=None): """Locks and gets an entry from the queue. If a previously got entry was not consumed, this method will return that entry. :param timeout: Maximum waiting time in seconds. If None then it will wait untill an entry appears in the queue. :returns: A locked entry value or None if the timeout was reached. :rtype: bytes """ self._ensure_paths() if not self.processing_element is None: return self.processing_element[1] else: return self._inner_get(timeout)
[docs] def holds_lock(self): """Checks if a node still holds the lock. :returns: True if a node still holds the lock, False otherwise. :rtype: bool """ if self.processing_element is None: return False lock_id, _ = self.processing_element lock_path = "{path}/{id}".format(path=self._lock_path, id=lock_id) self.client.sync(lock_path) value, stat = self.client.retry(self.client.get, lock_path) return value == self.id
[docs] def consume(self): """Removes a currently processing entry from the queue. :returns: True if element was removed successfully, False otherwise. :rtype: bool """ if not self.processing_element is None and self.holds_lock: id_, value = self.processing_element with self.client.transaction() as transaction: transaction.delete("{path}/{id}".format( path=self._entries_path, id=id_)) transaction.delete("{path}/{id}".format( path=self._lock_path, id=id_)) self.processing_element = None return True else: return False
def _inner_get(self, timeout): flag = self.client.handler.event_object() lock = self.client.handler.lock_object() canceled = False value = [] def check_for_updates(event): if not event is None and event.type != EventType.CHILD: return with lock: if canceled or flag.isSet(): return values = self.client.retry(self.client.get_children, self._entries_path, check_for_updates) taken = self.client.retry(self.client.get_children, self._lock_path, check_for_updates) available = self._filter_locked(values, taken) if len(available) > 0: ret = self._take(available[0]) if not ret is None: # By this time, no one took the task value.append(ret) flag.set() check_for_updates(None) retVal = None flag.wait(timeout) with lock: canceled = True if len(value) > 0: # We successfully locked an entry self.processing_element = value[0] retVal = value[0][1] return retVal def _filter_locked(self, values, taken): taken = set(taken) available = sorted(values) return (available if len(taken) == 0 else [x for x in available if x not in taken]) def _take(self, id_): try: self.client.create( "{path}/{id}".format( path=self._lock_path, id=id_), self.id, ephemeral=True) value, stat = self.client.retry(self.client.get, "{path}/{id}".format(path=self._entries_path, id=id_)) except (NoNodeError, NodeExistsError): # Item is already consumed or locked return None return (id_, value)