This document describes the current stable version of Kombu (4.0). For development docs, go here.

Source code for kombu.transport.zookeeper

"""Zookeeper transport.

:copyright: (c) 2010 - 2013 by Mahendra M.
:license: BSD, see LICENSE for more details.

**Synopsis**

Connects to a zookeeper node as <server>:<port>/<vhost>
The <vhost> becomes the base for all the other znodes.  So we can use
it like a vhost.

This uses the built-in kazoo recipe for queues

**References**

- https://zookeeper.apache.org/doc/trunk/recipes.html#sc_recipes_Queues
- https://kazoo.readthedocs.io/en/latest/api/recipe/queue.html

**Limitations**
This queue does not offer reliable consumption.  An entry is removed from
the queue prior to being processed.  So if an error occurs, the consumer
has to re-queue the item or it will be lost.
"""
from __future__ import absolute_import, unicode_literals

import os
import socket

from kombu.five import Empty
from kombu.utils.encoding import bytes_to_str
from kombu.utils.json import loads, dumps

from . import virtual

try:
    import kazoo
    from kazoo.client import KazooClient
    from kazoo.recipe.queue import Queue

    KZ_CONNECTION_ERRORS = (
        kazoo.exceptions.SystemErrorException,
        kazoo.exceptions.ConnectionLossException,
        kazoo.exceptions.MarshallingErrorException,
        kazoo.exceptions.UnimplementedException,
        kazoo.exceptions.OperationTimeoutException,
        kazoo.exceptions.NoAuthException,
        kazoo.exceptions.InvalidACLException,
        kazoo.exceptions.AuthFailedException,
        kazoo.exceptions.SessionExpiredException,
    )

    KZ_CHANNEL_ERRORS = (
        kazoo.exceptions.RuntimeInconsistencyException,
        kazoo.exceptions.DataInconsistencyException,
        kazoo.exceptions.BadArgumentsException,
        kazoo.exceptions.MarshallingErrorException,
        kazoo.exceptions.UnimplementedException,
        kazoo.exceptions.OperationTimeoutException,
        kazoo.exceptions.ApiErrorException,
        kazoo.exceptions.NoNodeException,
        kazoo.exceptions.NoAuthException,
        kazoo.exceptions.NodeExistsException,
        kazoo.exceptions.NoChildrenForEphemeralsException,
        kazoo.exceptions.NotEmptyException,
        kazoo.exceptions.SessionExpiredException,
        kazoo.exceptions.InvalidCallbackException,
        socket.error,
    )
except ImportError:
    kazoo = None                                    # noqa
    KZ_CONNECTION_ERRORS = KZ_CHANNEL_ERRORS = ()   # noqa

DEFAULT_PORT = 2181

__author__ = 'Mahendra M <mahendra.m@gmail.com>'


[docs]class Channel(virtual.Channel): """Zookeeper Channel.""" _client = None _queues = {} def _get_path(self, queue_name): return os.path.join(self.vhost, queue_name) def _get_queue(self, queue_name): queue = self._queues.get(queue_name, None) if queue is None: queue = Queue(self.client, self._get_path(queue_name)) self._queues[queue_name] = queue # Ensure that the queue is created len(queue) return queue def _put(self, queue, message, **kwargs): return self._get_queue(queue).put( dumps(message), priority=self._get_message_priority(message, reverse=True), ) def _get(self, queue): queue = self._get_queue(queue) msg = queue.get() if msg is None: raise Empty() return loads(bytes_to_str(msg)) def _purge(self, queue): count = 0 queue = self._get_queue(queue) while True: msg = queue.get() if msg is None: break count += 1 return count def _delete(self, queue, *args, **kwargs): if self._has_queue(queue): self._purge(queue) self.client.delete(self._get_path(queue)) def _size(self, queue): queue = self._get_queue(queue) return len(queue) def _new_queue(self, queue, **kwargs): if not self._has_queue(queue): queue = self._get_queue(queue) def _has_queue(self, queue): return self.client.exists(self._get_path(queue)) is not None def _open(self): conninfo = self.connection.client self.vhost = os.path.join('/', conninfo.virtual_host[0:-1]) hosts = [] if conninfo.alt: for host_port in conninfo.alt: if host_port.startswith('zookeeper://'): host_port = host_port[len('zookeeper://'):] if not host_port: continue try: host, port = host_port.split(':', 1) host_port = (host, int(port)) except ValueError: if host_port == conninfo.hostname: host_port = (host_port, conninfo.port or DEFAULT_PORT) else: host_port = (host_port, DEFAULT_PORT) hosts.append(host_port) host_port = (conninfo.hostname, conninfo.port or DEFAULT_PORT) if host_port not in hosts: hosts.insert(0, host_port) conn_str = ','.join(['%s:%s' % (h, p) for h, p in hosts]) conn = KazooClient(conn_str) conn.start() return conn @property def client(self): if self._client is None: self._client = self._open() return self._client
[docs]class Transport(virtual.Transport): """Zookeeper Transport.""" Channel = Channel polling_interval = 1 default_port = DEFAULT_PORT connection_errors = ( virtual.Transport.connection_errors + KZ_CONNECTION_ERRORS ) channel_errors = ( virtual.Transport.channel_errors + KZ_CHANNEL_ERRORS ) driver_type = 'zookeeper' driver_name = 'kazoo' def __init__(self, *args, **kwargs): if kazoo is None: raise ImportError('The kazoo library is not installed') super(Transport, self).__init__(*args, **kwargs)
[docs] def driver_version(self): return kazoo.__version__