This document describes the current stable version of Kombu (5.3). For development docs, go here.

File-system Transport - kombu.transport.filesystem

File-system Transport module for kombu.

Transport using the file-system as the message store. Messages written to the queue are stored in data_folder_in directory and messages read from the queue are read from data_folder_out directory. Both directories must be created manually. Simple example:

  • Producer:

import kombu

conn = kombu.Connection(
    'filesystem://', transport_options={
        'data_folder_in': 'data_in', 'data_folder_out': 'data_out'
    }
)
conn.connect()

test_queue = kombu.Queue('test', routing_key='test')

with conn as conn:
    with conn.default_channel as channel:
        producer = kombu.Producer(channel)
        producer.publish(
                    {'hello': 'world'},
                    retry=True,
                    exchange=test_queue.exchange,
                    routing_key=test_queue.routing_key,
                    declare=[test_queue],
                    serializer='pickle'
        )
  • Consumer:

import kombu

conn = kombu.Connection(
    'filesystem://', transport_options={
        'data_folder_in': 'data_out', 'data_folder_out': 'data_in'
    }
)
conn.connect()

def callback(body, message):
    print(body, message)
    message.ack()

test_queue = kombu.Queue('test', routing_key='test')

with conn as conn:
    with conn.default_channel as channel:
        consumer = kombu.Consumer(
            conn, [test_queue], accept=['pickle']
        )
        consumer.register_callback(callback)
        with consumer:
            conn.drain_events(timeout=1)

Features

  • Type: Virtual

  • Supports Direct: Yes

  • Supports Topic: Yes

  • Supports Fanout: Yes

  • Supports Priority: No

  • Supports TTL: No

Connection String

Connection string is in the following format:

filesystem://

Transport Options

  • data_folder_in - directory where are messages stored when written to queue.

  • data_folder_out - directory from which are messages read when read from queue.

  • store_processed - if set to True, all processed messages are backed up to processed_folder.

  • processed_folder - directory where are backed up processed files.

  • control_folder - directory where are exchange-queue table stored.

Transport

class kombu.transport.filesystem.Transport(client, **kwargs)[source]

Filesystem Transport.

class Channel(connection, **kwargs)

Filesystem Channel.

property control_folder
property data_folder_in
property data_folder_out
get_table(exchange)

Get table of bindings for exchange.

property processed_folder
property store_processed
supports_fanout = True

flag set if the channel supports fanout exchanges.

property transport_options
default_port = 0

port number used when no port is specified.

driver_name = 'filesystem'

Name of driver library (e.g. ‘py-amqp’, ‘redis’).

driver_type = 'filesystem'

Type of driver, can be used to separate transports using the AMQP protocol (driver_type: ‘amqp’), Redis (driver_type: ‘redis’), etc…

driver_version()[source]
global_state = <kombu.transport.virtual.base.BrokerState object>
implements = {'asynchronous': False, 'exchange_type': frozenset({'direct', 'fanout', 'topic'}), 'heartbeats': False}

Channel

class kombu.transport.filesystem.Channel(connection, **kwargs)[source]

Filesystem Channel.

property control_folder
property data_folder_in
property data_folder_out
get_table(exchange)[source]

Get table of bindings for exchange.

property processed_folder
property store_processed
supports_fanout = True

flag set if the channel supports fanout exchanges.

property transport_options