You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The DataFlow Operator records [Kubernetes Events](https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/event-v1/) for DataFlow resources. Events are attached to the corresponding DataFlow object and can be viewed with `kubectl describe dataflow <name>` or `kubectl get events`.
4
+
5
+
## Overview
6
+
7
+
The controller uses a standard Kubernetes `EventRecorder` (via `mgr.GetEventRecorderFor("dataflow-controller")`). Events are emitted during reconcile for:
8
+
9
+
- Successful creation or update of ConfigMap and Deployment
10
+
- Cleanup on DataFlow deletion
11
+
- Failures (secrets, ConfigMap, Deployment, status update, cleanup)
12
+
13
+
RBAC for events is declared in the controller: the operator needs `create` and `patch` on `events` in the core API group.
14
+
15
+
## Event types
16
+
17
+
Kubernetes supports two event types: **Normal** (informational) and **Warning** (failure or attention needed).
18
+
19
+
### Normal events
20
+
21
+
| Reason | Message | When |
22
+
|--------|---------|------|
23
+
|`ConfigMapCreated`| Created ConfigMap \<name\>| ConfigMap for the DataFlow spec was created |
24
+
|`ConfigMapUpdated`| Updated ConfigMap \<name\>| ConfigMap for the DataFlow spec was updated |
25
+
|`DeploymentCreated`| Created Deployment \<name\>| Processor Deployment was created |
26
+
|`DeploymentUpdated`| Updated Deployment \<name\>| Processor Deployment was updated |
27
+
|`ResourcesDeleted`| Deleted Deployment and ConfigMap | Resources were removed during DataFlow deletion |
28
+
29
+
### Warning events
30
+
31
+
| Reason | Message | When |
32
+
|--------|---------|------|
33
+
|`FailedGet`| Unable to fetch DataFlow | The controller could not get the DataFlow object (e.g. not found or API error) |
34
+
|`CleanupFailed`| Failed to cleanup resources: \<error\>| Cleanup of Deployment/ConfigMap failed during deletion |
35
+
|`SecretResolutionFailed`| Failed to resolve secrets: \<error\>| Referenced secrets could not be resolved for the spec |
36
+
|`ConfigMapFailed`| Failed to create or update ConfigMap: \<error\>| ConfigMap create/update failed |
37
+
|`DeploymentFailed`| Failed to create or update Deployment: \<error\>| Deployment create/update failed |
38
+
|`StatusUpdateFailed`| Unable to update DataFlow status: \<error\>| Updating the DataFlow status field failed |
39
+
40
+
## Viewing events
41
+
42
+
```bash
43
+
# Events for a specific DataFlow (shown in describe)
44
+
kubectl describe dataflow -n <namespace><name>
45
+
46
+
# Recent events in a namespace (includes DataFlow-related events)
47
+
kubectl get events -n <namespace> --sort-by='.lastTimestamp'
48
+
49
+
# Only events for a specific DataFlow (by involved object)
50
+
kubectl get events -n <namespace> --field-selector involvedObject.name=<dataflow-name>
51
+
```
52
+
53
+
Events are namespaced and tied to the DataFlow `involvedObject`, so they appear in `kubectl describe dataflow` and in namespace event lists.
54
+
55
+
## Relation to status and metrics
56
+
57
+
-**Status**: The DataFlow `.status.phase` and `.status.message` are updated on success and failure; events provide an audit trail of what happened and when.
58
+
-**Metrics**: For observability, see [Metrics](metrics.md). Events complement metrics by giving discrete, human-readable reasons for failures and key lifecycle changes.
DataFlow Operator supports various message transformations that are applied sequentially to each message in the order specified in the configuration. All transformations support JSONPath for working with nested data structures.
3
+
DataFlow Operator supports message transformations that are applied sequentially to each message in the order specified in the configuration. Transformations use [gjson](https://github.com/tidwall/gjson) JSONPath for field access.
4
4
5
5
> **Note**: This is a simplified English version. For complete documentation, see the [Russian version](../ru/transformations.md).
The `format` value is a [Go time layout](https://pkg.go.dev/time#pkg-constants) string. Default is `RFC3339` (e.g. `2006-01-02T15:04:05Z07:00`). Examples: `RFC3339`, `RFC3339Nano`, or custom layouts like `2006-01-02 15:04:05`.
46
40
47
41
## Flatten
48
42
49
-
Expands an array into separate messages, preserving all parent fields. Useful for processing nested data structures.
43
+
Expands an array into separate messages, preserving all other fields from the original message. Each array element is merged into the root; objects are flattened to top-level keys. If the field is not an array, the message is returned unchanged. Supports Avro-style arrays wrapped in an object with an `array` key.
50
44
51
45
### Configuration
52
46
53
47
```yaml
54
48
transformations:
55
49
- type: flatten
56
50
flatten:
57
-
# JSONPath to array to expand (required)
51
+
# JSONPath to the array to expand (required)
58
52
field: items
59
53
```
60
54
61
55
## Filter
62
56
63
-
Filters messages based on JSONPath conditions. Messages that don't match the condition are removed from the stream.
57
+
Keeps only messages where the field at the given JSONPath exists and is *truthy* (boolean `true`, non-empty string, non-zero number). Other messages are dropped. Comparison expressions (e.g. `==`) are not supported; use Router for value-based routing.
64
58
65
59
### Configuration
66
60
67
61
```yaml
68
62
transformations:
69
63
- type: filter
70
64
filter:
71
-
# JSONPath expression that must be true (required)
65
+
# JSONPath to a field; message passes if field exists and is truthy (required)
72
66
condition: "$.active"
73
67
```
74
68
@@ -94,7 +88,14 @@ transformations:
94
88
95
89
## Router
96
90
97
-
Routes messages to different sinks based on conditions. Messages matching a condition are sent to the specified sink instead of the main one.
91
+
Routes messages to different sinks based on conditions. The first matching route determines the sink; if none matches, the message goes to the main sink.
92
+
93
+
### Condition syntax
94
+
95
+
- **Truthiness**: `$.field` — the message matches if the field exists and is truthy (non-empty string, non-zero number, `true`).
96
+
- **Comparison**: `$.field == 'value'` or `$.field == "value"` — matches when the field equals the given string.
97
+
98
+
Conditions are evaluated in order; the first match wins.
98
99
99
100
### Configuration
100
101
@@ -103,25 +104,31 @@ transformations:
103
104
- type: router
104
105
router:
105
106
routes:
106
-
- condition: "$.level"
107
+
- condition: "$.level == 'error'"
107
108
sink:
108
109
type: kafka
109
110
kafka:
110
111
brokers: ["localhost:9092"]
111
112
topic: error-topic
113
+
- condition: "$.level == 'warning'"
114
+
sink:
115
+
type: postgresql
116
+
postgresql:
117
+
connectionString: "postgres://..."
118
+
table: warnings
112
119
```
113
120
114
121
## Select
115
122
116
-
Selects only specified fields from a message. Useful for reducing data size and improving performance.
123
+
Keeps only the specified fields; all others are dropped. Each field is taken by JSONPath; the last path segment is used as the key in the output (e.g. `user.name` → key `name`), so the result is flat.
117
124
118
125
### Configuration
119
126
120
127
```yaml
121
128
transformations:
122
129
- type: select
123
130
select:
124
-
# List of JSONPath expressions to fields to select (required)
131
+
# List of JSONPath expressions for fields to keep (required)
125
132
fields:
126
133
- id
127
134
- name
@@ -226,96 +233,14 @@ transformations:
226
233
}
227
234
```
228
235
229
-
## ReplaceField
230
-
231
-
Renames fields in messages. Useful for normalizing data structure and changing field paths.
232
-
233
-
### Configuration
234
-
235
-
```yaml
236
-
transformations:
237
-
- type: replaceField
238
-
replaceField:
239
-
# List of rename rules in format "old.path:new.path" (required)
240
-
renames:
241
-
- key.sku:sku
242
-
- body.lc:lc
243
-
```
244
-
245
-
### Rename Rule Format
246
-
247
-
Rename rules have the format `old.path:new.path`:
248
-
- Left part (before `:`) - JSONPath to existing field
249
-
- Right part (after `:`) - JSONPath to new field
250
-
- JSONPath syntax is supported (you can use `$.` prefix)
236
+
## Planned transformations
251
237
252
-
### Example
253
-
254
-
**Input:**
255
-
```json
256
-
{
257
-
"key": {
258
-
"sku": "12345"
259
-
},
260
-
"body": {
261
-
"lc": "en"
262
-
}
263
-
}
264
-
```
265
-
266
-
**Output:**
267
-
```json
268
-
{
269
-
"sku": "12345",
270
-
"lc": "en"
271
-
}
272
-
```
273
-
274
-
## HeaderFrom
275
-
276
-
Extracts data from Kafka message headers and adds them to the message body. Useful for enriching messages with metadata from headers.
277
-
278
-
### Configuration
279
-
280
-
```yaml
281
-
transformations:
282
-
- type: headerFrom
283
-
headerFrom:
284
-
# List of mappings in format "headerName:field.path" (required)
285
-
mappings:
286
-
- X-Request-Id:requestId
287
-
- X-Language:metadata.language
288
-
```
289
-
290
-
### Mapping Format
291
-
292
-
Mappings have the format `headerName:field.path`:
293
-
- Left part (before `:`) - Kafka message header name
294
-
- Right part (after `:`) - JSONPath to field in message body where header value will be written
295
-
- JSONPath syntax is supported (you can use `$.` prefix)
DataFlow Operator записывает [события Kubernetes (Events)](https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/event-v1/) для ресурсов DataFlow. События привязываются к соответствующему объекту DataFlow и отображаются в выводе `kubectl describe dataflow <name>` или `kubectl get events`.
4
+
5
+
## Обзор
6
+
7
+
Контроллер использует стандартный Kubernetes `EventRecorder` (через `mgr.GetEventRecorderFor("dataflow-controller")`). События генерируются в процессе reconcile при:
8
+
9
+
- Успешном создании или обновлении ConfigMap и Deployment
RBAC для событий объявлен в контроллере: оператору нужны права `create` и `patch` для ресурса `events` в core API group.
14
+
15
+
## Типы событий
16
+
17
+
В Kubernetes есть два типа событий: **Normal** (информационные) и **Warning** (ошибки или требующие внимания).
18
+
19
+
### События типа Normal
20
+
21
+
| Reason | Сообщение | Когда |
22
+
|--------|-----------|-------|
23
+
|`ConfigMapCreated`| Created ConfigMap \<имя\>| ConfigMap со spec DataFlow был создан |
24
+
|`ConfigMapUpdated`| Updated ConfigMap \<имя\>| ConfigMap со spec DataFlow был обновлён |
25
+
|`DeploymentCreated`| Created Deployment \<имя\>| Deployment процессора был создан |
26
+
|`DeploymentUpdated`| Updated Deployment \<имя\>| Deployment процессора был обновлён |
27
+
|`ResourcesDeleted`| Deleted Deployment and ConfigMap | Ресурсы удалены при удалении DataFlow |
28
+
29
+
### События типа Warning
30
+
31
+
| Reason | Сообщение | Когда |
32
+
|--------|-----------|-------|
33
+
|`FailedGet`| Unable to fetch DataFlow | Не удалось получить объект DataFlow (например, не найден или ошибка API) |
34
+
|`CleanupFailed`| Failed to cleanup resources: \<ошибка\>| Ошибка очистки Deployment/ConfigMap при удалении |
35
+
|`SecretResolutionFailed`| Failed to resolve secrets: \<ошибка\>| Не удалось разрешить ссылки на секреты для spec |
36
+
|`ConfigMapFailed`| Failed to create or update ConfigMap: \<ошибка\>| Ошибка создания или обновления ConfigMap |
37
+
|`DeploymentFailed`| Failed to create or update Deployment: \<ошибка\>| Ошибка создания или обновления Deployment |
38
+
|`StatusUpdateFailed`| Unable to update DataFlow status: \<ошибка\>| Не удалось обновить поле статуса DataFlow |
39
+
40
+
## Просмотр событий
41
+
42
+
```bash
43
+
# События по конкретному DataFlow (показываются в describe)
44
+
kubectl describe dataflow -n <namespace><имя>
45
+
46
+
# Последние события в пространстве имён (включая события DataFlow)
47
+
kubectl get events -n <namespace> --sort-by='.lastTimestamp'
48
+
49
+
# Только события по конкретному DataFlow (по объекту involved)
50
+
kubectl get events -n <namespace> --field-selector involvedObject.name=<имя-dataflow>
51
+
```
52
+
53
+
События привязаны к пространству имён и к объекту DataFlow (`involvedObject`), поэтому они отображаются в `kubectl describe dataflow` и в списке событий пространства имён.
54
+
55
+
## Связь со статусом и метриками
56
+
57
+
-**Статус**: Поля `.status.phase` и `.status.message` DataFlow обновляются при успехе и при ошибках; события дают журнал того, что произошло и когда.
58
+
-**Метрики**: Для мониторинга см. [Метрики](metrics.md). События дополняют метрики дискретными, читаемыми причинами сбоев и ключевых изменений жизненного цикла.
0 commit comments