Движок таблицы S3Queue
Этот движок обеспечивает интеграцию с экосистемой Amazon S3 и позволяет выполнять потоковый импорт. Он аналогичен движкам Kafka и RabbitMQ, но предоставляет функции, специфичные для S3.
Важно учитывать следующее примечание из исходного PR по реализации S3Queue: когда к таблице с этим движком присоединяется MATERIALIZED VIEW, движок таблицы S3Queue начинает собирать данные в фоновом режиме.
Создать таблицу
До версии 24.7 требуется использовать префикс s3queue_ для всех настроек, кроме mode, after_processing и keeper_path.
Параметры движка
Параметры S3Queue такие же, как у табличного движка S3. См. раздел «Параметры» здесь.
Пример
Использование именованных коллекций:
Settings
Чтобы получить список настроек, заданных для таблицы, используйте таблицу system.s3_queue_settings. Доступно, начиная с версии 24.10.
Начиная с версии 24.7, настройки S3Queue можно указывать с префиксом s3queue_ или без него:
- Современный синтаксис (24.7+):
processing_threads_num,tracked_file_ttl_secи т. д. - Устаревший синтаксис (все версии):
s3queue_processing_threads_num,s3queue_tracked_file_ttl_secи т. д.
Оба варианта поддерживаются в 24.7+. Примеры на этой странице используют современный синтаксис без префикса.
Mode
Возможные значения:
- unordered — В режиме unordered множество всех уже обработанных файлов отслеживается с помощью постоянных узлов в ZooKeeper.
- ordered — В режиме ordered файлы обрабатываются в лексикографическом порядке. Это означает, что если файл с именем
BBBбыл обработан в какой‑то момент, а позже в бакет был добавлен файл с именемAA, он будет проигнорирован. В ZooKeeper сохраняются только максимальное имя (в лексикографическом смысле) успешно обработанного файла и имена файлов, которые будут повторно загружены после неудачной попытки загрузки.
Значение по умолчанию: ordered в версиях до 24.6. Начиная с 24.6 значение по умолчанию отсутствует, настройку требуется указывать вручную. Для таблиц, созданных в более ранних версиях, значение по умолчанию останется ordered для сохранения совместимости.
after_processing
Что делать с файлом после успешной обработки.
Возможные значения:
- keep.
- delete.
- move.
- tag.
Значение по умолчанию: keep.
Для варианта move требуются дополнительные настройки. В случае перемещения в пределах того же бакета необходимо указать новый префикс пути в параметре after_processing_move_prefix.
Перемещение в другой S3‑бакет требует указания URI целевого бакета в параметре after_processing_move_uri, а также учетных данных доступа к S3 в параметрах after_processing_move_access_key_id и after_processing_move_secret_access_key.
Пример:
Для перемещения данных из одного контейнера Azure в другой необходимо указать строку подключения Blob Storage в параметре after_processing_move_connection_string и имя контейнера в параметре after_processing_move_container. См. настройки AzureQueue.
Для добавления тегов необходимо указать ключ и значение тега в параметрах after_processing_tag_key и after_processing_tag_value.
after_processing_retries
Количество повторных попыток выполнения запрошенного действия послеобработки, после которых попытки прекращаются.
Возможные значения:
- Неотрицательное целое число.
Значение по умолчанию: 10.
after_processing_move_access_key_id
ID ключа доступа (Access Key ID) для S3‑бакета, в который нужно переместить успешно обработанные файлы, если целевым местом назначения является другой S3‑бакет.
Возможные значения:
- Строка.
Значение по умолчанию: пустая строка.
after_processing_move_prefix
Префикс пути, в который перемещаются успешно обработанные файлы. Применимо как при перемещении в пределах того же бакета, так и при перемещении в другой бакет.
Возможные значения:
- Строка.
Значение по умолчанию: пустая строка.
after_processing_move_secret_access_key
Secret Access Key для S3‑бакета, в который нужно перемещать успешно обработанные файлы, если целевой ресурс — другой S3‑бакет.
Возможные значения:
- Строка.
Значение по умолчанию: пустая строка.
after_processing_move_uri
URI S3-бакета, в который следует перемещать успешно обработанные файлы, если местом назначения является другой S3-бакет.
Возможные значения:
- Строка.
Значение по умолчанию: пустая строка.
after_processing_tag_key
Ключ тега, который будет использоваться для пометки успешно обработанных файлов, если after_processing='tag'.
Возможные значения:
- Строка.
Значение по умолчанию: пустая строка.
after_processing_tag_value
Значение тега, которое будет присвоено успешно обработанным файлам, если after_processing='tag'.
Возможные значения:
- Строка.
Значение по умолчанию: пустая строка.
keeper_path
Путь в ZooKeeper может быть задан в параметрах движка таблицы, либо путь по умолчанию может быть сформирован из пути, указанного в глобальной конфигурации, и UUID таблицы. Возможные значения:
- строка.
Значение по умолчанию: /.
loading_retries
Повторять загрузку файла до указанного количества раз. По умолчанию повторы не выполняются. Возможные значения:
- Положительное целое число.
Значение по умолчанию: 0.
processing_threads_num
Количество потоков обработки. Применяется только в режиме Unordered.
Значение по умолчанию: число CPU или 16.
parallel_inserts
По умолчанию processing_threads_num будет выполнять один INSERT, поэтому файлы будут только загружаться и парситься в несколько потоков.
Но это ограничивает степень параллелизма, поэтому для лучшей пропускной способности используйте parallel_inserts=true — это позволит вставлять данные параллельно (но имейте в виду, что это приведёт к большему количеству создаваемых частей данных для семейства движков MergeTree).
INSERT-запросы будут создаваться с учётом настроек max_process*_before_commit.
Значение по умолчанию: false.
enable_logging_to_s3queue_log
Включает логирование в system.s3queue_log.
Значение по умолчанию: 0.
polling_min_timeout_ms
Указывает минимальное время в миллисекундах, которое ClickHouse ожидает перед следующей попыткой опроса.
Возможные значения:
- Положительное целое число.
Значение по умолчанию: 1000.
polling_max_timeout_ms
Определяет максимальное время в миллисекундах, в течение которого ClickHouse ждёт перед запуском следующей попытки опроса.
Возможные значения:
- Положительное целое число.
Значение по умолчанию: 10000.
polling_backoff_ms
Определяет дополнительное время ожидания, добавляемое к предыдущему интервалу опроса при отсутствии новых файлов. Следующий опрос выполняется после истечения времени, равного сумме предыдущего интервала и этого значения backoff, либо по наступлении максимального интервала — в зависимости от того, что меньше.
Возможные значения:
- Положительное целое число.
Значение по умолчанию: 0.
tracked_files_limit
Позволяет ограничить количество узлов ZooKeeper при использовании режима unordered; не влияет на режим ordered.
Если лимит достигнут, самые старые обработанные файлы будут удалены из узла ZooKeeper и обработаны повторно.
Возможные значения:
- Положительное целое число.
Значение по умолчанию: 1000.
tracked_file_ttl_sec
Максимальное время в секундах для хранения обработанных файлов в узле ZooKeeper (по умолчанию хранятся бессрочно) в режиме unordered; не влияет на режим ordered.
По истечении указанного времени файл будет повторно импортирован.
Возможные значения:
- Положительное целое число.
Значение по умолчанию: 0.
cleanup_interval_min_ms
Для режима Ordered. Определяет минимальное значение интервала перепланирования фоновой задачи, которая отвечает за поддержание TTL отслеживаемых файлов и ограничения на их максимальное количество.
Значение по умолчанию: 10000.
cleanup_interval_max_ms
Для режима Ordered. Определяет верхнюю границу интервала перепланирования фоновой задачи, которая отвечает за поддержание TTL отслеживаемых файлов и максимального числа отслеживаемых файлов.
Значение по умолчанию: 30000.
buckets
Для режима Ordered. Доступно начиная с версии 24.6. Если существует несколько реплик таблицы S3Queue, каждая из которых работает с одним и тем же каталогом метаданных в keeper, значение buckets должно быть не меньше количества реплик. Если также используется настройка processing_threads, имеет смысл дополнительно увеличить значение buckets, так как она определяет фактический уровень параллелизма обработки в S3Queue.
use_persistent_processing_nodes
По умолчанию таблица S3Queue всегда использовала эфемерные узлы обработки, что могло приводить к дублированию данных в случае, если сессия ZooKeeper истекает до того, как S3Queue зафиксирует обработанные файлы в ZooKeeper, но после того, как обработка уже началась. Эта настройка гарантирует исключение возможности появления дубликатов при истечении сессии Keeper.
persistent_processing_nodes_ttl_seconds
В случае некорректного завершения работы сервера, если включён use_persistent_processing_nodes, могут остаться неудалённые узлы обработки. Этот параметр определяет период времени, по истечении которого эти узлы обработки могут быть безопасно удалены.
Значение по умолчанию: 3600 (1 час).
Настройки S3
Движок поддерживает все настройки S3. Дополнительную информацию о настройках S3 см. здесь.
Ролевой доступ к S3
S3 Role-Based Access is available in the Scale and Enterprise plans. To upgrade, visit the plans page in the cloud console.
Движок таблицы s3Queue поддерживает ролевую модель доступа. См. документацию здесь с описанием шагов по настройке роли для доступа к вашему бакету.
После настройки роли значение roleARN можно передать через параметр extra_credentials, как показано ниже:
Режим S3Queue ordered
Режим обработки S3Queue позволяет хранить меньше метаданных в ZooKeeper, но имеет ограничение: файлы, добавленные позже по времени, должны иметь имена, которые лексикографически (по алфавитно-цифровому порядку) больше предыдущих.
Режим ordered в S3Queue, так же как и unordered, поддерживает настройку (s3queue_)processing_threads_num (префикс s3queue_ является необязательным), которая позволяет управлять количеством потоков, обрабатывающих файлы S3 локально на сервере.
Кроме того, режим ordered вводит ещё одну настройку под названием (s3queue_)buckets, которая обозначает «логические потоки». В распределённом сценарии, когда есть несколько серверов с репликами таблицы S3Queue, эта настройка определяет количество единиц обработки. Например, каждый поток обработки на каждой реплике S3Queue будет пытаться захватить определённый bucket для обработки, при этом каждый bucket соответствует определённым файлам по хэшу имени файла. Поэтому в распределённом сценарии настоятельно рекомендуется, чтобы значение настройки (s3queue_)buckets было как минимум равно количеству реплик или больше. Допустимо, чтобы число buckets было больше числа реплик. Оптимальный сценарий — когда значение настройки (s3queue_)buckets равно произведению number_of_replicas и (s3queue_)processing_threads_num.
Настройку (s3queue_)processing_threads_num не рекомендуется использовать до версии 24.6.
Настройка (s3queue_)buckets доступна, начиная с версии 24.6.
SELECT из движка таблиц S3Queue
Запросы SELECT по умолчанию запрещены для таблиц S3Queue. Это соответствует общему шаблону работы очереди, когда данные читаются один раз, а затем удаляются из очереди. SELECT запрещён, чтобы предотвратить случайную потерю данных.
Однако иногда это может быть полезно. Для этого необходимо установить настройку stream_like_engine_allow_direct_select в True.
Движок S3Queue имеет специальную настройку для запросов SELECT: commit_on_select. Установите её в False, чтобы сохранять данные в очереди после чтения, или в True, чтобы удалять их.
Описание
SELECT мало полезен для потокового импорта (кроме отладки), потому что каждый файл можно импортировать только один раз. Более практично создавать потоки в реальном времени с помощью materialized views. Для этого:
- Используйте этот движок для создания таблицы, которая будет читать данные из указанного пути в S3 и рассматривать её как поток данных.
- Создайте таблицу с требуемой структурой.
- Создайте materialized view, которое преобразует данные из этого движка и помещает их в ранее созданную таблицу.
Когда MATERIALIZED VIEW подключено к этому движку, оно начинает собирать данные в фоновом режиме.
Пример:
Виртуальные столбцы
_path— путь к файлу._file— имя файла._size— размер файла._time— время создания файла.
Дополнительную информацию о виртуальных столбцах см. здесь.
Подстановочные символы в path
Аргумент path может задавать несколько файлов, используя подстановочные шаблоны в стиле bash. Чтобы файл был обработан, он должен существовать и полностью соответствовать шаблону пути. Перечень файлов определяется во время выполнения SELECT (а не в момент CREATE).
*— Заменяет любое количество любых символов, кроме/, включая пустую строку.**— Заменяет любое количество любых символов, включая/, включая пустую строку.?— Заменяет ровно один произвольный символ.{some_string,another_string,yet_another_one}— Заменяет любую из строк'some_string', 'another_string', 'yet_another_one'.{N..M}— Заменяет любое число в диапазоне от N до M включительно. N и M могут содержать ведущие нули, например000..078.
Конструкции с {} аналогичны табличной функции remote.
Ограничения
- Дубликаты строк могут возникать в результате:
-
во время парсинга происходит исключение в середине обработки файла, и включены повторные попытки через
s3queue_loading_retries; -
S3Queueнастроен на нескольких серверах, указывающих на один и тот же путь в ZooKeeper, и сессия Keeper завершается до того, как один из серверов успел зафиксировать обработанный файл, что может привести к тому, что другой сервер возьмет в обработку файл, который уже мог быть частично или полностью обработан первым сервером; однако это не актуально, начиная с версии 25.8, еслиuse_persistent_processing_nodes = 1. -
аварийного завершения работы сервера.
- Если
S3Queueнастроен на нескольких серверах, указывающих на один и тот же путь в ZooKeeper, и используется режимOrdered, тоs3queue_loading_retriesне будет работать. Это будет скоро исправлено.
Интроспекция
Для интроспекции используйте неперсистентную таблицу system.s3queue и персистентную таблицу system.s3queue_log.
system.s3queue. Эта таблица неперсистентная и отображает состояниеS3Queueв памяти: какие файлы в данный момент обрабатываются, какие файлы уже обработаны или завершились с ошибкой.
Пример:
system.s3queue_log. Персистентная таблица. Содержит ту же информацию, что иsystem.s3queue, но для файлов со статусамиprocessedиfailed.
Таблица имеет следующую структуру:
Чтобы использовать system.s3queue_log, укажите его конфигурацию в конфигурационном файле сервера:
Пример: