This document describes the current stable version of Celery (4.0). For development docs, go here.

Source code for celery.utils.text

# -*- coding: utf-8 -*-
"""Text formatting utilities."""
from __future__ import absolute_import, unicode_literals
import re
from collections import Callable
from functools import partial
from textwrap import fill
from pprint import pformat
from celery.five import string_t

__all__ = [
    'abbr', 'abbrtask', 'dedent', 'dedent_initial',
    'ensure_newlines', 'ensure_sep',
    'fill_paragraphs', 'indent', 'join',
    'pluralize', 'pretty', 'str_to_list', 'simple_format', 'truncate',
]

UNKNOWN_SIMPLE_FORMAT_KEY = """
Unknown format %{0} in string {1!r}.
Possible causes: Did you forget to escape the expand sign (use '%%{0!r}'),
or did you escape and the value was expanded twice? (%%N -> %N -> %hostname)?
""".strip()

RE_FORMAT = re.compile(r'%(\w)')


[docs]def str_to_list(s): # type: (str) -> List[str] """Convert string to list.""" if isinstance(s, string_t): return s.split(',') return s
[docs]def dedent_initial(s, n=4): # type: (str, int) -> str """Remove identation from first line of text.""" return s[n:] if s[:n] == ' ' * n else s
[docs]def dedent(s, n=4, sep='\n'): # type: (str, int, str) -> str """Remove identation.""" return sep.join(dedent_initial(l) for l in s.splitlines())
[docs]def fill_paragraphs(s, width, sep='\n'): # type: (str, int, str) -> str """Fill paragraphs with newlines (or custom separator).""" return sep.join(fill(p, width) for p in s.split(sep))
[docs]def join(l, sep='\n'): # type: (str, str) -> str """Concatenate list of strings.""" return sep.join(v for v in l if v)
[docs]def ensure_sep(sep, s, n=2): # type: (str, str, int) -> str """Ensure text s ends in separator sep'.""" return s + sep * (n - s.count(sep))
ensure_newlines = partial(ensure_sep, '\n')
[docs]def abbr(S, max, ellipsis='...'): # type: (str, int, str) -> str """Abbreviate word.""" if S is None: return '???' if len(S) > max: return ellipsis and (S[:max - len(ellipsis)] + ellipsis) or S[:max] return S
[docs]def abbrtask(S, max): # type: (str, int) -> str """Abbreviate task name.""" if S is None: return '???' if len(S) > max: module, _, cls = S.rpartition('.') module = abbr(module, max - len(cls) - 3, False) return module + '[.]' + cls return S
[docs]def indent(t, indent=0, sep='\n'): # type: (str, int, str) -> str """Indent text.""" return sep.join(' ' * indent + p for p in t.split(sep))
[docs]def truncate(s, maxlen=128, suffix='...'): # type: (str, int, str) -> str """Truncate text to a maximum number of characters.""" if maxlen and len(s) >= maxlen: return s[:maxlen].rsplit(' ', 1)[0] + suffix return s
[docs]def pluralize(n, text, suffix='s'): # type: (int, str, str) -> str """Pluralize term when n is greater than one.""" if n != 1: return text + suffix return text
[docs]def pretty(value, width=80, nl_width=80, sep='\n', **kw): # type: (str, int, int, str, **Any) -> str """Format value for printing to console.""" if isinstance(value, dict): return '{{{0} {1}'.format(sep, pformat(value, 4, nl_width)[1:]) elif isinstance(value, tuple): return '{0}{1}{2}'.format( sep, ' ' * 4, pformat(value, width=nl_width, **kw), ) else: return pformat(value, width=width, **kw)
def match_case(s, other): # type: (str, str) -> str return s.upper() if other.isupper() else s.lower()
[docs]def simple_format(s, keys, pattern=RE_FORMAT, expand=r'\1'): # type: (str, Mapping[str, str], Pattern, str) -> str """Format string, expanding abbreviations in keys'.""" if s: keys.setdefault('%', '%') def resolve(match): key = match.expand(expand) try: resolver = keys[key] except KeyError: raise ValueError(UNKNOWN_SIMPLE_FORMAT_KEY.format(key, s)) if isinstance(resolver, Callable): return resolver() return resolver return pattern.sub(resolve, s) return s
def remove_repeating_from_task(task_name, s): # type: (str, str) -> str """Given task name, remove repeating module names. Example: >>> remove_repeating_from_task( ... 'tasks.add', ... 'tasks.add(2, 2), tasks.mul(3), tasks.div(4)') 'tasks.add(2, 2), mul(3), div(4)' """ # This is used by e.g. repr(chain), to remove repeating module names. # - extract the module part of the task name module = str(task_name).rpartition('.')[0] + '.' return remove_repeating(module, s) def remove_repeating(substr, s): # type: (str, str) -> str """Remove repeating module names from string. Arguments: task_name (str): Task name (full path including module), to use as the basis for removing module names. s (str): The string we want to work on. Example: >>> _shorten_names( ... 'x.tasks.add', ... 'x.tasks.add(2, 2) | x.tasks.add(4) | x.tasks.mul(8)', ... ) 'x.tasks.add(2, 2) | add(4) | mul(8)' """ # find the first occurrence of substr in the string. index = s.find(substr) if index >= 0: return ''.join([ # leave the first occurance of substr untouched. s[:index + len(substr)], # strip seen substr from the rest of the string. s[index + len(substr):].replace(substr, ''), ]) return s