From 2906c1bd2d8ef1fed2b7f8e9449f1cb5b7a83402 Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Fri, 5 Dec 2025 16:12:08 +0000 Subject: [PATCH 1/4] init --- .../core/concepts/streaming_query/toc_i.yaml | 1 + .../concepts/streaming_query/watermarks.md | 9 ++++ .../yql/reference/syntax/select/group-by.md | 44 ++++++++++++++++++- .../core/yql/reference/syntax/select/with.md | 23 ++++++++++ 4 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 ydb/docs/ru/core/concepts/streaming_query/watermarks.md diff --git a/ydb/docs/ru/core/concepts/streaming_query/toc_i.yaml b/ydb/docs/ru/core/concepts/streaming_query/toc_i.yaml index c6495b91f2db..22d1cc172944 100644 --- a/ydb/docs/ru/core/concepts/streaming_query/toc_i.yaml +++ b/ydb/docs/ru/core/concepts/streaming_query/toc_i.yaml @@ -2,3 +2,4 @@ items: - { name: Обзор, href: index.md } - { name: Форматы данных, href: formats.md } - { name: Чекпойнты, href: checkpoints.md } +- { name: Водяные знаки, href: watermarks.md } diff --git a/ydb/docs/ru/core/concepts/streaming_query/watermarks.md b/ydb/docs/ru/core/concepts/streaming_query/watermarks.md new file mode 100644 index 000000000000..54dcc94e7c0f --- /dev/null +++ b/ydb/docs/ru/core/concepts/streaming_query/watermarks.md @@ -0,0 +1,9 @@ +# Водяные знаки + +Каждое [событие](../datamodel/topic.md#message) в системе потоковой обработки данных имеет ассоциированную с ним временную метку. Эта метка может равняться времени чтения события из [топика](../datamodel/topic.md), может быть получена из данных внутри события или из метаданных [топика](../datamodel/topic.md). + +Поверх этого времени события можно делать сортировку на потоке (внутри [MATCH_RECOGNIZE](../../yql/reference/syntax/select/match_recognize.md#order_by)) или агрегацию на временнОм окне ([GROUP BY HoppingWindow](../../yql/reference/syntax/select/group-by.md#hopping_window)). Эти потоковые операции должны знать текущее время, чтобы на основе этой информации генерировать выходные данные и делать это в режиме реального времени. Времени, получаемого из события, не всегда достаточно, так как события могут приходить нерегулярно или отфильтровываться на более ранних этапах. + +Для решения этой проблемы нужен водяной знак. Это наибольшая временная метка, которая гарантированно находится в прошлом для каждой партиции. + +В условиях распределенных систем, когда часы на разных устройствах могут дрейфовать, а данные - задерживаться из-за проблем с сетью, водяной знак не может полагаться на монотонное возрастание времени событий даже в рамках одной партиции. Для этого в формулу расчета водяного знака закладывается отставание. Сейчас водяной знак считается как время записи события в топик, уменьшенное на 5 секунд. diff --git a/ydb/docs/ru/core/yql/reference/syntax/select/group-by.md b/ydb/docs/ru/core/yql/reference/syntax/select/group-by.md index 8411f4a40e62..f19f9dfbbd1b 100644 --- a/ydb/docs/ru/core/yql/reference/syntax/select/group-by.md +++ b/ydb/docs/ru/core/yql/reference/syntax/select/group-by.md @@ -269,7 +269,7 @@ LIMIT 3; {% endif %} - ## GROUP BY ... HOP +## GROUP BY HOP {#hop} Сгруппировать таблицу по значениям указанных столбцов или выражений, а также подмножества по времени (окно времени). @@ -334,6 +334,48 @@ GROUP BY HOP(ts, "PT1М", "PT1M", "PT1M"); ``` +## GROUP BY HoppingWindow {#hopping_window} + +Новая версия [GROUP BY HOP](#hop) + +{% if select_command == "SELECT STREAM" %} +Отличается от предшественника тем, что не требует указания аргумента `delay` из-за обязательного использования [водяных знаков](../../../../concepts/streaming_query/watermarks.md) +{% else %} +Отличается от предшественника тем, что не требует указания игнорируемого аргумента `delay` +{% endif %} + +### Пример + +{% if select_command == "SELECT STREAM" %} +```yql +SELECT + key, + COUNT(*) +FROM my_topic +WITH ( + FORMAT = json_each_row, + SCHEMA ( + key String, + subkey String, + value String + ) +) +GROUP BY + key, + HoppingWindow(CAST(subkey AS Timestamp), "PT10S", "PT1M"); +``` +{% else %} +```yql +SELECT + key, + COUNT(*) +FROM my_table +GROUP BY + key, + HoppingWindow(CAST(subkey AS Timestamp), "PT10S", "PT1M"); +``` +{% endif %} + ## HAVING {#having} Фильтрация выборки `SELECT` по результатам вычисления [агрегатных функций](../../builtins/aggregation.md). Синтаксис аналогичен конструкции [`WHERE`](where.md). diff --git a/ydb/docs/ru/core/yql/reference/syntax/select/with.md b/ydb/docs/ru/core/yql/reference/syntax/select/with.md index 17df1d29560b..ee4fb981614f 100644 --- a/ydb/docs/ru/core/yql/reference/syntax/select/with.md +++ b/ydb/docs/ru/core/yql/reference/syntax/select/with.md @@ -24,6 +24,13 @@ * `projection.enabled` - флаг включения [расширенного партиционирования данных](../../../../concepts/federated_query/s3/partition_projection.md). Допустимые значения: `true`, `false`. * `projection..type` - тип поля [расширенного партиционирования данных](../../../../concepts/federated_query/s3/partition_projection.md). Допустимые значения: `integer`, `enum`, `date`. * `projection..` - расширенные свойства поля [расширенного партиционирования данных](../../../../concepts/federated_query/s3/partition_projection.md). +{% if select_command == "SELECT STREAM" %} +* `WATERMARK LATE EVENTS POLICY` - политика, определяющая реакцию на событие с временем меньшим, чем [водяной знак](../../../../concepts/streaming_query/watermarks.md). Значение по умолчанию - `WATERMARK_ADJUST_LATE_EVENTS`. Выбрать что-то одно: + * `WATERMARK_ADJUST_LATE_EVENTS` - если у события время меньше, чем [водяной знак](../../../../concepts/streaming_query/watermarks.md), то время этого события исправляется на значение [водяного знака](../../../../concepts/streaming_query/watermarks.md); + * `WATERMARK_DROP_LATE_EVENTS` - отбросить событие с временем меньшим, чем [водяной знак](../../../../concepts/streaming_query/watermarks.md); +* `WATERMARK_GRANULARITY` - периодичность генерации водяных знаков. Чем она меньше, тем больше потребление CPU запросом и тем меньше задержка ответа, и наоборот. Значение по умолчанию - 1 секунда; +* `WATERMARK_IDLE_TIMEOUT` - период, после которого партиция без данных будет исключена из вычисления объединенного водяного знака. Значение по умолчанию - 5 секунд. +{% endif %} {% endif %} @@ -60,3 +67,19 @@ SELECT key, value FROM my_table WITH COLUMNS Struct; ```yql SELECT key, value FROM EACH($my_tables) WITH SCHEMA Struct>; ``` + +{% if select_command == "SELECT STREAM" %} +```yql +SELECT * +FROM my_topic +WITH ( + FORMAT = json_each_row, + SCHEMA ( + ts String + ), + WATERMARK_ADJUST_LATE_EVENTS, + WATERMARK_GRANULARITY="PT1S", + WATERMARK_IDLE_TIMEOUT="PT5S" +); +``` +{% endif %} From b1f371ef3b16552502f0b7b190810010559a55bb Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Thu, 11 Dec 2025 11:41:52 +0300 Subject: [PATCH 2/4] fix warnings --- .../ru/core/concepts/streaming_query/formats.md | 14 +++++++------- ydb/docs/ru/core/concepts/streaming_query/index.md | 12 ++++++------ ydb/docs/ru/core/public-materials/videos/2025.md | 12 ++++++------ 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/ydb/docs/ru/core/concepts/streaming_query/formats.md b/ydb/docs/ru/core/concepts/streaming_query/formats.md index 63a40edae43d..2f53a63c115a 100644 --- a/ydb/docs/ru/core/concepts/streaming_query/formats.md +++ b/ydb/docs/ru/core/concepts/streaming_query/formats.md @@ -66,7 +66,7 @@ WITH FORMAT = "csv_with_names", SCHEMA = ( - Year Int32 NOT NULL, + Year Int32 NOT NULL, Manufacturer String NOT NULL, Model String NOT NULL, Price Double NOT NULL @@ -186,7 +186,7 @@ FROM source_name.input_topic_name WITH ( FORMAT = 'json_as_string', - SCHEMA = + SCHEMA = ( Data Json ) @@ -270,7 +270,7 @@ LIMIT 1; Пример запроса: ```sql -$input = +$input = SELECT CAST(data AS Json) AS json FROM source_name.input_topic_name @@ -290,7 +290,7 @@ SELECT FROM $input; -SELECT +SELECT * FROM $parsed LIMIT 1; @@ -306,8 +306,8 @@ LIMIT 1; Пример запроса: ```sql -$input = -SELECT +$input = +SELECT * FROM source_name.input_topic_name WITH ( @@ -326,7 +326,7 @@ SELECT * FROM (SELECT ts[0] as ts, update from $col) FLATTEN COLUMNS; -SELECT +SELECT * FROM $volumes LIMIT 1; diff --git a/ydb/docs/ru/core/concepts/streaming_query/index.md b/ydb/docs/ru/core/concepts/streaming_query/index.md index 28baced671b5..fb0ad26f4c21 100644 --- a/ydb/docs/ru/core/concepts/streaming_query/index.md +++ b/ydb/docs/ru/core/concepts/streaming_query/index.md @@ -34,13 +34,13 @@ - сохранения [порядковых номеров сообщений](../topic#seqno) для дедупликации в выходных топиках, - сохранения стейтов агрегаций для тасок, содержащих потоковую агрегацию (таких как `GROUP BY HOP` / `MATCH_RECOGNIZE`). -### Использование [читателя](../datamodel/topic#consumer) +## Использование [читателя](../datamodel/topic#consumer) По умолчанию чтение из топика происходит [без использования читателя](../../reference/ydb-sdk/topic.md#no-consumer). Чтобы использовать читателя необходимо предварительно его создать через [CLI](../../reference/ydb-cli/topic-consumer-add) или при создании топика с помощью [CREATE TOPIC](../../yql/reference/syntax/create-topic.md). Далее указать его имя в тексте запроса через `PRAGMA pq.Consumer=my_consumer` (см. пример в [CREATE STREAMING QUERY](../../../yql/reference/syntax/create-streaming-query)). -### Состояние запроса +## Состояние запроса Запросы могут быть в 2-х состояних: запущен или остановлен. При этом в запущенном состоянии запрос может быть в нескольких статусах. Подробную информацию о запросе можно получить через системную таблицу [streaming_queries](../../dev/system-views.md#streaming_queries). @@ -50,11 +50,11 @@ SELECT Path, Status, Text, Run FROM `.sys/streaming_queries`; ``` -### Поддерживаемые типы данных +## Поддерживаемые типы данных Топики {{ ydb-short-name }} хранят неструктурированные данные. Поэтому при чтении необоходимо указывать формат и схему данных (см. [Форматы данных](formats.md)). Запись можно выполнять только в виде неструктурированных данных (например как строка или JSON). -### Конфигурирование +## Конфигурирование Функциональность включается установкой флагов `enable_external_data_sources` и `enable_streaming_queries` в конфигурации кластера. Пример: @@ -65,7 +65,7 @@ feature_flags: enable_streaming_queries: true ``` -### Синтаксис +## Синтаксис Чтение реализовано через [внешние источники данных](../datamodel/external_data_source), поэтому предварительно необходимо создать источник через [CREATE EXTERNAL DATA SOURCE](../../../yql/reference/syntax/create-external-data-source). @@ -86,7 +86,7 @@ CREATE EXTERNAL DATA SOURCE source_name WITH ( - [ALTER STREAMING QUERY](../../../yql/reference/syntax/alter-streaming-query), - [DROP STREAMING QUERY](../../../yql/reference/syntax/drop-streaming-query). -### См. также +## См. также - [Форматы данных](formats.md) - [Чекпойнты](checkpoints.md) diff --git a/ydb/docs/ru/core/public-materials/videos/2025.md b/ydb/docs/ru/core/public-materials/videos/2025.md index 555956f74012..d95eee0b3555 100644 --- a/ydb/docs/ru/core/public-materials/videos/2025.md +++ b/ydb/docs/ru/core/public-materials/videos/2025.md @@ -4,9 +4,9 @@ {% include notitle [use_cases_tag](../_includes/tags.md#use_cases) %} -Доклад Кирилла Сюзева посвящен эволюции CI/CD в сервисе SourceCraft, его архитектуре, выбору и анализу различных решений в области CI/CD (GitLab, GitHub, Arcadia CD), а также решению технических проблем. Одной из таких проблем является Real Time Logging, ключом к решению которой стало использование YDB как места для хранения логов и передаче их пользователю. +Доклад Кирилла Сюзева посвящен эволюции CI/CD в сервисе SourceCraft, его архитектуре, выбору и анализу различных решений в области CI/CD (GitLab, GitHub, Arcadia CD), а также решению технических проблем. Одной из таких проблем является Real Time Logging, ключом к решению которой стало использование YDB как места для хранения логов и передаче их пользователю. -Доклад будет интересен DevOps-инженерам, разработчикам программного обеспечения. +Доклад будет интересен DevOps-инженерам, разработчикам программного обеспечения. @[youtube](https://www.youtube.com/watch?v=_2aeqTQALj0&) @@ -22,7 +22,7 @@ {% list tabs %} -- YouTube +- YouTube @[youtube](https://youtu.be/Dy0VtzQatag?list=PL6Wui14DvQPwuUE1tijVmiBSl2LCLH3ru) @@ -36,13 +36,13 @@ {% include notitle [database_internals_tag](../_includes/tags.md#database_internals) %} -Лекция, расказанная Виталием Гридневым в рамках Школы Анализа Данных, даёт вводное представление об архитектуре YDB: рассматриваются детерминистические транзакции и MVCC, объясняется жизненный цикл выполнения запроса и роль ключевых компонентов системы. На практических примерах показано, как обрабатываются запросы в кластере. Отдельно разбирается реализация вторичных индексов и особенности их добавления к существующим таблицам. В завершение освещаются подходы к тестированию YDB, включая методы верификации корректности и обеспечения надёжности. +Лекция, расказанная Виталием Гридневым в рамках Школы Анализа Данных, даёт вводное представление об архитектуре YDB: рассматриваются детерминистические транзакции и MVCC, объясняется жизненный цикл выполнения запроса и роль ключевых компонентов системы. На практических примерах показано, как обрабатываются запросы в кластере. Отдельно разбирается реализация вторичных индексов и особенности их добавления к существующим таблицам. В завершение освещаются подходы к тестированию YDB, включая методы верификации корректности и обеспечения надёжности. Материал будет полезен backend-разработчикам, инженерам распределённых систем, SRE/DevOps-специалистам и QA-инженерам, работающим с высоконагруженными СУБД. {% list tabs %} -- YouTube +- YouTube @[youtube](https://youtu.be/s8KB7dKvBEc?list=PL6Wui14DvQPwuUE1tijVmiBSl2LCLH3ru) @@ -56,7 +56,7 @@ {% include notitle [database_internals_tag](../_includes/tags.md#database_internals) %} -Антон Барабанов в своем докладе рассказывает о работе с данными в Яндекс.Метрике, включая процессинг данных с использованием {{ ydb-short-name }}. Обсуждаются различные вопросы, возникающие при работе с базами данных — от сложностей с записью и чтением данных до неравномерной нагрузки на узлы кластера, а также вопросы оптимизации запросов, структуры данных и экономической выгоды использования SSD и HDD. +Антон Барабанов в своем докладе рассказывает о работе с данными в Яндекс.Метрике, включая процессинг данных с использованием {{ ydb-short-name }}. Обсуждаются различные вопросы, возникающие при работе с базами данных — от сложностей с записью и чтением данных до неравномерной нагрузки на узлы кластера, а также вопросы оптимизации запросов, структуры данных и экономической выгоды использования SSD и HDD. Эти темы будут интересны инженерам баз данных, разработчикам баз данных и администраторам баз данных. From 50b21f1e2308f7f493c8927407681d647d936413 Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Thu, 11 Dec 2025 15:27:39 +0300 Subject: [PATCH 3/4] fix hopping after review --- .../yql/reference/syntax/select/group-by.md | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/ydb/docs/ru/core/yql/reference/syntax/select/group-by.md b/ydb/docs/ru/core/yql/reference/syntax/select/group-by.md index f19f9dfbbd1b..c3a4e559acd1 100644 --- a/ydb/docs/ru/core/yql/reference/syntax/select/group-by.md +++ b/ydb/docs/ru/core/yql/reference/syntax/select/group-by.md @@ -281,28 +281,28 @@ HOP(time_extractor, hop, interval, delay) Реализованный вариант окна времени называется **hopping window**. Это окно, продвигающееся вперёд дискретными интервалами (параметр `hop`). Общая длительность окна задаётся параметром `interval`. Для определения времени каждого входного события используется параметр `time_extractor`. Это выражение, зависящее только от входных значений столбцов, должно иметь тип `Timestamp`. Оно указывает, откуда именно в данных доставать значение времени. -{% if select_command != "SELECT STREAM" %} При этом происходит следующее: -1. Входная таблица партиционируется по ключам группировки, указанным в `GROUP BY`, без учета HOP. Если кроме HOP в `GROUP BY` ничего нет, то входная таблица попадает в одну партицию. -2. Каждая партиция сортируется по возрастанию значения выражения `time_extractor`. -3. Каждая партиция делится на подмножества (возможно пересекащиеся), на которых вычисляются агрегатные функции. +1. Входная таблица партиционируется по ключам группировки, указанным в `GROUP BY`, без учета HOP. Если кроме HOP в `GROUP BY` ничего нет, то входная таблица попадает в одну партицию; +2. В каждой партиции окно продвигается независимо от других; +{% if select_command == "SELECT STREAM" %} +3. Каждая партиция обрабатывается в порядке возрастания значения выражения `time_extractor`. Допускаются небольшие перестановки строгого порядка возрастания входного потока; +{% else %} +3. Каждая партиция сортируется по возрастанию значения выражения `time_extractor`; {% endif %} - -В каждом потоке (партиции), определяемом значениями всех столбцов группировки, окно продвигается независимо от других потоков. Продвижение окна полностью зависит от самого позднего события в партиции. +4. Каждая партиция делится на возможно пересекающиеся подмножества из событий; +5. На каждом подмножестве событий вычисляются заданные агрегатные функции. {% if select_command == "SELECT STREAM" %} -Поскольку записи в потоках слегка перемешиваются во времени, добавлен параметр `delay`, позволяющий отложить закрытие окна на указанную величину. События, приходящие до текущего окна, игнорируются. +Так как события в партиции могут слегка перемешиваться, добавлен параметр `delay` для указания задержки закрытия окна. События, приходящие до текущего окна, игнорируются. +{% else %} +Параметр `delay` игнорируется так как данные в одной партиции уже отсортированы. {% endif %} Параметры `interval` и `delay` следует задавать кратными параметру `hop`. Некратные интервалы в текущей реализации запрещены. -Параметры `interval` и `hop` следует задавать положительными. +Параметры `interval` и `hop` должны быть ненулевыми. -{% if select_command != "SELECT STREAM" %} -Параметр `delay` в текущей реализации игнорируется т.к. данные в одной партиции уже отсортированы. -{% endif %} - -Для задания `hop`, `interval` и `delay` используется строковое выражение, соответствующее стандарту [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601). Это формат, который используется для конструирования встроенного типа `Interval` [из строки](../../builtins/basic.md#data-type-literals). +Для задания `hop`, `interval` и `delay` используется строковое выражение, соответствующее стандарту [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601). Это формат, который используется для конструирования встроенного типа `Interval` из [строки](../../builtins/basic.md#data-type-literals). При выборке столбцов (между `SELECT ... FROM`) можно использовать функции `HOP_START` и `HOP_END` (без параметров), которые возвращают значение типа `Timestamp` и соответствуют началу и концу текущего окна. @@ -338,10 +338,14 @@ GROUP BY Новая версия [GROUP BY HOP](#hop) +```yql +HoppingWindow(time_extractor, hop, interval) +``` + {% if select_command == "SELECT STREAM" %} -Отличается от предшественника тем, что не требует указания аргумента `delay` из-за обязательного использования [водяных знаков](../../../../concepts/streaming_query/watermarks.md) +Отличается от предшественника тем, что не требует указания аргумента `delay`. В `HoppingWindow` окна будут закрываться по получении [водяного знака](../../../../concepts/streaming_query/watermarks.md), а не по истечении `delay`, как это сделано в `HOP`. {% else %} -Отличается от предшественника тем, что не требует указания игнорируемого аргумента `delay` +Отличается от предшественника тем, что не требует указания игнорируемого аргумента `delay`. {% endif %} ### Пример From 7bdea28849c90efaa569ef34e1117a8b0218a566 Mon Sep 17 00:00:00 2001 From: Aleksei Pozdniakov Date: Thu, 11 Dec 2025 15:28:04 +0300 Subject: [PATCH 4/4] fix watermarks after review --- ydb/docs/ru/core/concepts/streaming_query/index.md | 1 + ydb/docs/ru/core/concepts/streaming_query/watermarks.md | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/ydb/docs/ru/core/concepts/streaming_query/index.md b/ydb/docs/ru/core/concepts/streaming_query/index.md index fb0ad26f4c21..ba3d406365d6 100644 --- a/ydb/docs/ru/core/concepts/streaming_query/index.md +++ b/ydb/docs/ru/core/concepts/streaming_query/index.md @@ -90,4 +90,5 @@ CREATE EXTERNAL DATA SOURCE source_name WITH ( - [Форматы данных](formats.md) - [Чекпойнты](checkpoints.md) +- [Водяные знаки](watermarks.md) - [Рецепты работы с потоковыми запросами](../../recipes/streaming_queries/index.md) diff --git a/ydb/docs/ru/core/concepts/streaming_query/watermarks.md b/ydb/docs/ru/core/concepts/streaming_query/watermarks.md index 54dcc94e7c0f..5a568777fea2 100644 --- a/ydb/docs/ru/core/concepts/streaming_query/watermarks.md +++ b/ydb/docs/ru/core/concepts/streaming_query/watermarks.md @@ -2,8 +2,8 @@ Каждое [событие](../datamodel/topic.md#message) в системе потоковой обработки данных имеет ассоциированную с ним временную метку. Эта метка может равняться времени чтения события из [топика](../datamodel/topic.md), может быть получена из данных внутри события или из метаданных [топика](../datamodel/topic.md). -Поверх этого времени события можно делать сортировку на потоке (внутри [MATCH_RECOGNIZE](../../yql/reference/syntax/select/match_recognize.md#order_by)) или агрегацию на временнОм окне ([GROUP BY HoppingWindow](../../yql/reference/syntax/select/group-by.md#hopping_window)). Эти потоковые операции должны знать текущее время, чтобы на основе этой информации генерировать выходные данные и делать это в режиме реального времени. Времени, получаемого из события, не всегда достаточно, так как события могут приходить нерегулярно или отфильтровываться на более ранних этапах. +Поверх этого времени события можно делать сортировку на потоке (внутри [MATCH_RECOGNIZE](../../yql/reference/syntax/select/match_recognize.md#order_by)) или агрегацию на временнОм окне ([HoppingWindow](../../yql/reference/syntax/select/group-by.md#hopping_window)). Эти потоковые операции должны знать текущее время, чтобы на основе этой информации генерировать выходные данные и делать это в режиме реального времени. Времени, получаемого из события, может быть недостаточно, например, когда события приходят нерегулярно по каждому ключу [партиционирования](../datamodel/topic.md#partitioning) или отфильтровываются на более ранних этапах. -Для решения этой проблемы нужен водяной знак. Это наибольшая временная метка, которая гарантированно находится в прошлом для каждой партиции. +Для решения проблемы продвижения времени в отсутствие данных нужно специальное событие, на которое не будут действовать фильтры и которое будет рассылаться по всем партициям сразу. Такое специальное событие называется водяной знак. Это наибольшая временная метка, которая находится в прошлом для каждой партиции. В условиях распределенных систем, когда часы на разных устройствах могут дрейфовать, а данные - задерживаться из-за проблем с сетью, водяной знак не может полагаться на монотонное возрастание времени событий даже в рамках одной партиции. Для этого в формулу расчета водяного знака закладывается отставание. Сейчас водяной знак считается как время записи события в топик, уменьшенное на 5 секунд.