"""TreeCache
:Maintainer: Jiangge Zhang <tonyseek@gmail.com>
:Maintainer: Haochuan Guo <guohaochuan@gmail.com>
:Maintainer: Tianwen Zhang <mail2tevin@gmail.com>
:Status: Alpha
A port of the Apache Curator's TreeCache recipe. It builds an in-memory cache
of a subtree in ZooKeeper and keeps it up-to-date.
See also: http://curator.apache.org/curator-recipes/tree-cache.html
"""
from __future__ import absolute_import
import os
import logging
import contextlib
import functools
import operator
from kazoo.exceptions import NoNodeError, KazooException
from kazoo.protocol.states import KazooState, EventType
logger = logging.getLogger(__name__)
[docs]class TreeCache(object):
"""The cache of a ZooKeeper subtree.
:param client: A :class:`~kazoo.client.KazooClient` instance.
:param path: The root path of subtree.
"""
STATE_LATENT = 0
STATE_STARTED = 1
STATE_CLOSED = 2
def __init__(self, client, path):
self._client = client
self._root = TreeNode.make_root(self, path)
self._state = self.STATE_LATENT
self._outstanding_ops = 0
self._is_initialized = False
self._error_listeners = []
self._event_listeners = []
[docs] def start(self):
"""Starts the cache.
The cache is not started automatically. You must call this method.
After a cache started, all changes of subtree will be synchronized
from the ZooKeeper server. Events will be fired for those activity.
See also :meth:`~TreeCache.listen`.
.. note::
This method is not thread safe.
"""
if self._state == self.STATE_LATENT:
self._state = self.STATE_STARTED
elif self._state == self.STATE_CLOSED:
raise KazooException('already closed')
else:
raise KazooException('already started')
self._client.add_listener(self._session_watcher)
self._client.ensure_path(self._root._path)
if self._client.connected:
self._root.on_created()
[docs] def close(self):
"""Closes the cache.
A closed cache was detached from ZooKeeper's changes. And all nodes
will be invalidated.
Once a tree cache was closed, it could not be started again. You should
only close a tree cache while you want to recycle it.
.. note::
This method is not thread safe.
"""
if self._state == self.STATE_STARTED:
self._state = self.STATE_CLOSED
self._client.remove_listener(self._session_watcher)
with handle_exception(self._error_listeners):
self._root.on_deleted()
[docs] def listen(self, listener):
"""Registers a function to listen the cache events.
The cache events are changes of local data. They are delivered from
watching notifications in ZooKeeper session.
This method can be use as a decorator.
:param listener: A callable object which accepting a
:class:`~kazoo.recipe.cache.TreeEvent` instance as
its argument.
"""
self._event_listeners.append(listener)
return listener
[docs] def listen_fault(self, listener):
"""Registers a function to listen the exceptions.
It is possible to meet some exceptions during the cache running. You
could specific handlers for them.
This method can be use as a decorator.
:param listener: A callable object which accepting an exception as its
argument.
"""
self._error_listeners.append(listener)
return listener
[docs] def get_data(self, path, default=None):
"""Gets data of a node from cache.
:param path: The absolute path string.
:param default: The default value which will be returned if the node
does not exist.
:raises ValueError: If the path is outside of this subtree.
:returns: A :class:`~kazoo.recipe.cache.NodeData` instance.
"""
node = self._find_node(path)
return default if node is None else node._data
[docs] def get_children(self, path, default=None):
"""Gets node children list from in-memory snapshot.
:param path: The absolute path string.
:param default: The default value which will be returned if the node
does not exist.
:raises ValueError: If the path is outside of this subtree.
:returns: The :class:`frozenset` which including children names.
"""
node = self._find_node(path)
return default if node is None else frozenset(node._children)
def _find_node(self, path):
if not path.startswith(self._root._path):
raise ValueError('outside of tree')
striped_path = path[len(self._root._path):].strip('/')
splited_path = [p for p in striped_path.split('/') if p]
current_node = self._root
for node_name in splited_path:
if node_name not in current_node._children:
return
current_node = current_node._children[node_name]
return current_node
def _publish_event(self, event_type, event_data=None):
event = TreeEvent.make(event_type, event_data)
if self._state != self.STATE_CLOSED:
logger.debug('public event: %r', event)
self._in_background(self._do_publish_event, event)
def _do_publish_event(self, event):
for listener in self._event_listeners:
with handle_exception(self._error_listeners):
listener(event)
def _in_background(self, func, *args, **kwargs):
self._client.handler.callback_queue.put(lambda: func(*args, **kwargs))
def _session_watcher(self, state):
if state == KazooState.SUSPENDED:
self._publish_event(TreeEvent.CONNECTION_SUSPENDED)
elif state == KazooState.CONNECTED:
with handle_exception(self._error_listeners):
self._root.on_reconnected()
self._publish_event(TreeEvent.CONNECTION_RECONNECTED)
elif state == KazooState.LOST:
self._is_initialized = False
self._publish_event(TreeEvent.CONNECTION_LOST)
class TreeNode(object):
"""The tree node record.
:param tree: A :class:`~kazoo.recipe.cache.TreeCache` instance.
:param path: The path of current node.
:param parent: The parent node reference. ``None`` for root node.
"""
__slots__ = ('_tree', '_path', '_parent', '_depth', '_children', '_state',
'_data')
STATE_PENDING = 0
STATE_LIVE = 1
STATE_DEAD = 2
def __init__(self, tree, path, parent):
self._tree = tree
self._path = path
self._parent = parent
self._depth = parent._depth + 1 if parent else 0
self._children = {}
self._state = self.STATE_PENDING
self._data = None
@classmethod
def make_root(cls, tree, path):
return cls(tree, path, None)
def on_reconnected(self):
self._refresh()
for child in self._children.values():
child.on_reconnected()
def on_created(self):
self._refresh()
def on_deleted(self):
old_children, self._children = self._children, {}
old_data, self._data = self._data, None
for old_child in old_children.values():
old_child.on_deleted()
if self._tree._state == self._tree.STATE_CLOSED:
return
old_state, self._state = self._state, self.STATE_DEAD
if old_state == self.STATE_LIVE:
self._publish_event(TreeEvent.NODE_REMOVED, old_data)
if self._parent is None:
self._call_client('exists', self._path) # root node
else:
child = self._path[len(self._parent._path) + 1:]
if self._parent._children.get(child) is self:
del self._parent._children[child]
def _publish_event(self, *args, **kwargs):
return self._tree._publish_event(*args, **kwargs)
def _refresh(self):
self._refresh_data()
self._refresh_children()
def _refresh_data(self):
self._call_client('get', self._path)
def _refresh_children(self):
# TODO max-depth checking support
self._call_client('get_children', self._path)
def _call_client(self, method_name, path, *args):
self._tree._outstanding_ops += 1
callback = functools.partial(
self._tree._in_background, self._process_result,
method_name, path)
kwargs = {'watch': self._process_watch}
method = getattr(self._tree._client, method_name + '_async')
method(path, *args, **kwargs).rawlink(callback)
def _process_watch(self, watched_event):
logger.debug('process_watch: %r', watched_event)
with handle_exception(self._tree._error_listeners):
if watched_event.type == EventType.CREATED:
assert self._parent is None, 'unexpected CREATED on non-root'
self.on_created()
elif watched_event.type == EventType.DELETED:
self.on_deleted()
elif watched_event.type == EventType.CHANGED:
self._refresh_data()
elif watched_event.type == EventType.CHILD:
self._refresh_children()
def _process_result(self, method_name, path, result):
logger.debug('process_result: %s %s', method_name, path)
if method_name == 'exists':
assert self._parent is None, 'unexpected EXISTS on non-root'
# the value of result will be set with `None` if node not exists.
if result.get() is not None:
if self._state == self.STATE_DEAD:
self._state = self.STATE_PENDING
self.on_created()
elif method_name == 'get_children':
try:
children = result.get()
except NoNodeError:
self.on_deleted()
else:
for child in sorted(children):
full_path = os.path.join(path, child)
if child not in self._children:
node = TreeNode(self._tree, full_path, self)
self._children[child] = node
node.on_created()
elif method_name == 'get':
try:
data, stat = result.get()
except NoNodeError:
self.on_deleted()
else:
old_data, self._data = (
self._data, NodeData.make(path, data, stat))
old_state, self._state = self._state, self.STATE_LIVE
if old_state == self.STATE_LIVE:
if old_data is None or old_data.stat.mzxid != stat.mzxid:
self._publish_event(TreeEvent.NODE_UPDATED, self._data)
else:
self._publish_event(TreeEvent.NODE_ADDED, self._data)
else: # pragma: no cover
logger.warning('unknown operation %s', method_name)
self._tree._outstanding_ops -= 1
return
self._tree._outstanding_ops -= 1
if self._tree._outstanding_ops == 0 and not self._tree._is_initialized:
self._tree._is_initialized = True
self._publish_event(TreeEvent.INITIALIZED)
[docs]class TreeEvent(tuple):
"""The immutable event tuple of cache."""
NODE_ADDED = 0
NODE_UPDATED = 1
NODE_REMOVED = 2
CONNECTION_SUSPENDED = 3
CONNECTION_RECONNECTED = 4
CONNECTION_LOST = 5
INITIALIZED = 6
#: An enumerate integer to indicate event type.
event_type = property(operator.itemgetter(0))
#: A :class:`~kazoo.recipe.cache.NodeData` instance.
event_data = property(operator.itemgetter(1))
@classmethod
[docs] def make(cls, event_type, event_data):
"""Creates a new TreeEvent tuple.
:returns: A :class:`~kazoo.recipe.cache.TreeEvent` instance.
"""
assert event_type in (
cls.NODE_ADDED, cls.NODE_UPDATED, cls.NODE_REMOVED,
cls.CONNECTION_SUSPENDED, cls.CONNECTION_RECONNECTED,
cls.CONNECTION_LOST, cls.INITIALIZED)
return cls((event_type, event_data))
[docs]class NodeData(tuple):
"""The immutable node data tuple of cache."""
#: The absolute path string of current node.
path = property(operator.itemgetter(0))
#: The bytes data of current node.
data = property(operator.itemgetter(1))
#: The stat information of current node.
stat = property(operator.itemgetter(2))
@classmethod
[docs] def make(cls, path, data, stat):
"""Creates a new NodeData tuple.
:returns: A :class:`~kazoo.recipe.cache.NodeData` instance.
"""
return cls((path, data, stat))
@contextlib.contextmanager
def handle_exception(listeners):
try:
yield
except Exception as e:
logger.debug('processing error: %r', e)
for listener in listeners:
try:
listener(e)
except: # pragma: no cover
logger.exception('Exception handling exception') # oops
else:
logger.exception('No listener to process %r', e)