This document describes the current stable version of Celery (3.1). For development docs, go here.

Source code for celery.utils.debug

# -*- coding: utf-8 -*-
"""
    celery.utils.debug
    ~~~~~~~~~~~~~~~~~~

    Utilities for debugging memory usage.

"""
from __future__ import absolute_import, print_function, unicode_literals

import os

from contextlib import contextmanager
from functools import partial

from celery.five import range
from celery.platforms import signals

try:
    from psutil import Process
except ImportError:
    Process = None  # noqa

__all__ = [
    'blockdetection', 'sample_mem', 'memdump', 'sample',
    'humanbytes', 'mem_rss', 'ps',
]

UNITS = (
    (2 ** 40.0, 'TB'),
    (2 ** 30.0, 'GB'),
    (2 ** 20.0, 'MB'),
    (2 ** 10.0, 'kB'),
    (0.0, '{0!d}b'),
)

_process = None
_mem_sample = []


def _on_blocking(signum, frame):
    import inspect
    raise RuntimeError(
        'Blocking detection timed-out at: {0}'.format(
            inspect.getframeinfo(frame)
        )
    )


@contextmanager
def blockdetection(timeout):
    """A timeout context using ``SIGALRM`` that can be used to detect blocking
    functions."""
    if not timeout:
        yield
    else:
        old_handler = signals['ALRM']
        old_handler = None if old_handler == _on_blocking else old_handler

        signals['ALRM'] = _on_blocking

        try:
            yield signals.arm_alarm(timeout)
        finally:
            if old_handler:
                signals['ALRM'] = old_handler
            signals.reset_alarm()


[docs]def sample_mem(): """Sample RSS memory usage. Statistics can then be output by calling :func:`memdump`. """ current_rss = mem_rss() _mem_sample.append(current_rss) return current_rss
def _memdump(samples=10): S = _mem_sample prev = list(S) if len(S) <= samples else sample(S, samples) _mem_sample[:] = [] import gc gc.collect() after_collect = mem_rss() return prev, after_collect
[docs]def memdump(samples=10, file=None): """Dump memory statistics. Will print a sample of all RSS memory samples added by calling :func:`sample_mem`, and in addition print used RSS memory after :func:`gc.collect`. """ say = partial(print, file=file) if ps() is None: say('- rss: (psutil not installed).') return prev, after_collect = _memdump(samples) if prev: say('- rss (sample):') for mem in prev: say('- > {0},'.format(mem)) say('- rss (end): {0}.'.format(after_collect))
[docs]def sample(x, n, k=0): """Given a list `x` a sample of length ``n`` of that list is returned. E.g. if `n` is 10, and `x` has 100 items, a list of every 10th item is returned. ``k`` can be used as offset. """ j = len(x) // n for _ in range(n): try: yield x[k] except IndexError: break k += j
def hfloat(f, p=5): """Convert float to value suitable for humans. :keyword p: Float precision. """ i = int(f) return i if i == f else '{0:.{p}}'.format(f, p=p) def humanbytes(s): """Convert bytes to human-readable form (e.g. kB, MB).""" return next( '{0}{1}'.format(hfloat(s / div if div else s), unit) for div, unit in UNITS if s >= div )
[docs]def mem_rss(): """Return RSS memory usage as a humanized string.""" p = ps() if p is not None: return humanbytes(_process_memory_info(p).rss)
[docs]def ps(): """Return the global :class:`psutil.Process` instance, or :const:`None` if :mod:`psutil` is not installed.""" global _process if _process is None and Process is not None: _process = Process(os.getpid()) return _process
def _process_memory_info(process): try: return process.memory_info() except AttributeError: return process.get_memory_info()