This document describes the current stable version of Celery (4.0). For development docs, go here.
Source code for celery.utils.debug
# -*- coding: utf-8 -*-
"""Utilities for debugging memory usage, blocking calls, etc."""
from __future__ import absolute_import, print_function, unicode_literals
import os
import sys
import traceback
from contextlib import contextmanager
from functools import partial
from pprint import pprint
from celery.five import WhateverIO, items, range
from celery.platforms import signals
try:
from psutil import Process
except ImportError:
Process = None # noqa
__all__ = [
'blockdetection', 'sample_mem', 'memdump', 'sample',
'humanbytes', 'mem_rss', 'ps', 'cry',
]
UNITS = (
(2 ** 40.0, 'TB'),
(2 ** 30.0, 'GB'),
(2 ** 20.0, 'MB'),
(2 ** 10.0, 'KB'),
(0.0, 'b'),
)
_process = None
_mem_sample = []
def _on_blocking(signum, frame):
import inspect
raise RuntimeError(
'Blocking detection timed-out at: {0}'.format(
inspect.getframeinfo(frame)
)
)
@contextmanager
def blockdetection(timeout):
"""Context that raises an exception if process is blocking.
Uses ``SIGALRM`` to detect blocking functions.
"""
if not timeout:
yield
else:
old_handler = signals['ALRM']
old_handler = None if old_handler == _on_blocking else old_handler
signals['ALRM'] = _on_blocking
try:
yield signals.arm_alarm(timeout)
finally:
if old_handler:
signals['ALRM'] = old_handler
signals.reset_alarm()
[docs]def sample_mem():
"""Sample RSS memory usage.
Statistics can then be output by calling :func:`memdump`.
"""
current_rss = mem_rss()
_mem_sample.append(current_rss)
return current_rss
def _memdump(samples=10): # pragma: no cover
S = _mem_sample
prev = list(S) if len(S) <= samples else sample(S, samples)
_mem_sample[:] = []
import gc
gc.collect()
after_collect = mem_rss()
return prev, after_collect
[docs]def memdump(samples=10, file=None): # pragma: no cover
"""Dump memory statistics.
Will print a sample of all RSS memory samples added by
calling :func:`sample_mem`, and in addition print
used RSS memory after :func:`gc.collect`.
"""
say = partial(print, file=file)
if ps() is None:
say('- rss: (psutil not installed).')
return
prev, after_collect = _memdump(samples)
if prev:
say('- rss (sample):')
for mem in prev:
say('- > {0},'.format(mem))
say('- rss (end): {0}.'.format(after_collect))
[docs]def sample(x, n, k=0):
"""Given a list `x` a sample of length ``n`` of that list is returned.
For example, if `n` is 10, and `x` has 100 items, a list of every tenth.
item is returned.
``k`` can be used as offset.
"""
j = len(x) // n
for _ in range(n):
try:
yield x[k]
except IndexError:
break
k += j
def hfloat(f, p=5):
"""Convert float to value suitable for humans.
Arguments:
f (float): The floating point number.
p (int): Floating point precision (default is 5).
"""
i = int(f)
return i if i == f else '{0:.{p}}'.format(f, p=p)
def humanbytes(s):
"""Convert bytes to human-readable form (e.g., KB, MB)."""
return next(
'{0}{1}'.format(hfloat(s / div if div else s), unit)
for div, unit in UNITS if s >= div
)
[docs]def mem_rss():
"""Return RSS memory usage as a humanized string."""
p = ps()
if p is not None:
return humanbytes(_process_memory_info(p).rss)
[docs]def ps(): # pragma: no cover
"""Return the global :class:`psutil.Process` instance.
Note:
Returns :const:`None` if :pypi:`psutil` is not installed.
"""
global _process
if _process is None and Process is not None:
_process = Process(os.getpid())
return _process
def _process_memory_info(process):
try:
return process.memory_info()
except AttributeError:
return process.get_memory_info()
def cry(out=None, sepchr='=', seplen=49): # pragma: no cover
"""Return stack-trace of all active threads.
See Also:
Taken from https://gist.github.com/737056.
"""
import threading
out = WhateverIO() if out is None else out
P = partial(print, file=out)
# get a map of threads by their ID so we can print their names
# during the traceback dump
tmap = {t.ident: t for t in threading.enumerate()}
sep = sepchr * seplen
for tid, frame in items(sys._current_frames()):
thread = tmap.get(tid)
if not thread:
# skip old junk (left-overs from a fork)
continue
P('{0.name}'.format(thread))
P(sep)
traceback.print_stack(frame, file=out)
P(sep)
P('LOCAL VARIABLES')
P(sep)
pprint(frame.f_locals, stream=out)
P('\n')
return out.getvalue()