From 037d9018cd5fb36b7ef362571cdab774983d33c5 Mon Sep 17 00:00:00 2001 From: Leto_b Date: Mon, 9 Feb 2026 10:22:56 +0800 Subject: [PATCH] add pipe sugar from 2081 --- .../Table/User-Manual/Data-Sync_apache.md | 34 ++++++++++- .../Table/User-Manual/Data-Sync_timecho.md | 34 ++++++++++- .../Tree/User-Manual/Data-Sync_apache.md | 56 +++++++++++++++---- .../Tree/User-Manual/Data-Sync_timecho.md | 36 +++++++++++- .../User-Manual/Data-Sync_apache.md | 32 +++++++++++ .../User-Manual/Data-Sync_timecho.md | 32 +++++++++++ .../latest/User-Manual/Data-Sync_apache.md | 32 +++++++++++ .../latest/User-Manual/Data-Sync_timecho.md | 32 +++++++++++ .../Table/User-Manual/Data-Sync_apache.md | 32 +++++++++++ .../Table/User-Manual/Data-Sync_timecho.md | 30 ++++++++++ .../Tree/User-Manual/Data-Sync_apache.md | 32 +++++++++++ .../Tree/User-Manual/Data-Sync_timecho.md | 32 +++++++++++ .../User-Manual/Data-Sync_apache.md | 32 +++++++++++ .../User-Manual/Data-Sync_timecho.md | 30 ++++++++++ .../latest/User-Manual/Data-Sync_apache.md | 32 +++++++++++ .../latest/User-Manual/Data-Sync_timecho.md | 32 +++++++++++ 16 files changed, 524 insertions(+), 16 deletions(-) diff --git a/src/UserGuide/Master/Table/User-Manual/Data-Sync_apache.md b/src/UserGuide/Master/Table/User-Manual/Data-Sync_apache.md index e4aa5f985..fb859efa1 100644 --- a/src/UserGuide/Master/Table/User-Manual/Data-Sync_apache.md +++ b/src/UserGuide/Master/Table/User-Manual/Data-Sync_apache.md @@ -81,7 +81,7 @@ By declaratively configuring these three parts in an SQL statement, flexible dat - Data synchronization between IoTDB of 1. x series version and IoTDB of 2. x and above series versions is not supported. - When performing data synchronization tasks, avoid executing any deletion operations to prevent inconsistencies between the two ends. -- The `pipe` and `pipe plugins` for tree models and table models are designed to be isolated from each other. Before creating a `pipe`, it is recommended to first use the `show` command to query the built-in plugins available under the current `-sql_dialect` parameter configuration to ensure syntax compatibility and functional support. +- The `pipe` and `pipe plugins` for tree modes and table modes are designed to be isolated from each other. Before creating a `pipe`, it is recommended to first use the `show` command to query the built-in plugins available under the current `-sql_dialect` parameter configuration to ensure syntax compatibility and functional support. - Does not support the Object data type. ## 2. Usage Instructions @@ -118,6 +118,15 @@ WITH SINK ( **IF NOT EXISTS Semantics**: Ensures that the creation command is executed only if the specified Pipe does not exist, preventing errors caused by attempting to create an already existing Pipe. +**Note**: + +Starting from V2.0.8-beta, when creating a full data synchronization Pipe (e.g. Pipeid: `alldatapipe`), the system will automatically split it into two independent Pipes: + +* History Pipe: The PipeId is the original name plus the suffix `_history` (e.g. `alldatapipe_history`). The source parameter carries the default configurations: `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` +* Realtime Pipe: The PipeId is the original name plus the suffix `_realtime` (e.g. `alldatapipe_realtime`). The source parameter carries the default configuration: `'history.enable'='false'`. If metadata synchronization is configured, the Realtime Pipe will be responsible for sending the data. + +After successful creation, the original PipeId (e.g. `alldatapipe`) will no longer be a valid identifier. When performing task operations such as starting, stopping, deleting, or viewing, you must use the split independent PipeId (i.e. `*_history` or `*_realtime`). For operation examples, see the [View Task](./Data-Sync_apache.md#_2-5-view-task) section + ### 2.2 Start a Task @@ -183,6 +192,29 @@ Example Output of `SHOW PIPES`: - **RemainingEventCount** (statistics may have delays): Number of remaining events, including data and metadata synchronization events, as well as system and user-defined events. - **EstimatedRemainingSeconds** (statistics may have delays): Estimated remaining time to complete the transmission based on the current event count and pipe processing rate. +Example: + +In V2.0.8-beta and later versions, create a full data synchronization task and view the task details. + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + + ### 2.6 Synchronization Plugins To make the architecture more flexible and adaptable to different synchronization scenarios, IoTDB supports plugin assembly in the synchronization task framework. The system provides some common pre-installed plugins, and you can also customize `processor` and `sink` plugins and load them into the IoTDB system. diff --git a/src/UserGuide/Master/Table/User-Manual/Data-Sync_timecho.md b/src/UserGuide/Master/Table/User-Manual/Data-Sync_timecho.md index 50e289aef..32bd1fafa 100644 --- a/src/UserGuide/Master/Table/User-Manual/Data-Sync_timecho.md +++ b/src/UserGuide/Master/Table/User-Manual/Data-Sync_timecho.md @@ -80,7 +80,7 @@ By declaratively configuring these three parts in an SQL statement, flexible dat - Data synchronization between IoTDB of 1. x series version and IoTDB of 2. x and above series versions is not supported. - When performing data synchronization tasks, avoid executing any deletion operations to prevent inconsistencies between the two ends. -- The `pipe` and `pipe plugins` for tree models and table models are designed to be isolated from each other. Before creating a `pipe`, it is recommended to first use the `show` command to query the built-in plugins available under the current `-sql_dialect` parameter configuration to ensure syntax compatibility and functional support. +- The `pipe` and `pipe plugins` for tree modes and table modes are designed to be isolated from each other. Before creating a `pipe`, it is recommended to first use the `show` command to query the built-in plugins available under the current `-sql_dialect` parameter configuration to ensure syntax compatibility and functional support. - Does not support the Object data type. ## 2. Usage Instructions @@ -117,6 +117,15 @@ WITH SINK ( **IF NOT EXISTS Semantics**: Ensures that the creation command is executed only if the specified Pipe does not exist, preventing errors caused by attempting to create an already existing Pipe. +**Note**: + +Starting from V2.0.8, when creating a full data synchronization Pipe (e.g. Pipeid: `alldatapipe`), the system will automatically split it into two independent Pipes: + +* History Pipe: The PipeId is the original name plus the suffix `_history` (e.g. `alldatapipe_history`). The source parameter carries the default configurations: `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` +* Realtime Pipe: The PipeId is the original name plus the suffix `_realtime` (e.g. `alldatapipe_realtime`). The source parameter carries the default configuration: `'history.enable'='false'`. If metadata synchronization is configured, the Realtime Pipe will be responsible for sending the data. + +After successful creation, the original PipeId (e.g. `alldatapipe`) will no longer be a valid identifier. When performing task operations such as starting, stopping, deleting, or viewing, you must use the split independent PipeId (i.e. `*_history` or `*_realtime`). For operation examples, see the [View Task](./Data-Sync_timecho.md#_2-5-view-task) section + ### 2.2 Start a Task After creation, the task directly enters the RUNNING state and does not require manual startup. However, if the task is stopped using the `STOP PIPE` statement, you need to manually start it using the `START PIPE` statement. If the task stops due to an exception, it will automatically restart to resume data processing: @@ -181,6 +190,29 @@ Example Output of `SHOW PIPES`: - **RemainingEventCount** (statistics may have delays): Number of remaining events, including data and metadata synchronization events, as well as system and user-defined events. - **EstimatedRemainingSeconds** (statistics may have delays): Estimated remaining time to complete the transmission based on the current event count and pipe processing rate. +Example: + +In V2.0.8 and later versions, create a full data synchronization task and view the task details. + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + + ### 2.6 Synchronization Plugins To make the architecture more flexible and adaptable to different synchronization scenarios, IoTDB supports plugin assembly in the synchronization task framework. The system provides some common pre-installed plugins, and you can also customize `processor` and `sink` plugins and load them into the IoTDB system. diff --git a/src/UserGuide/Master/Tree/User-Manual/Data-Sync_apache.md b/src/UserGuide/Master/Tree/User-Manual/Data-Sync_apache.md index 0219d2a55..513d46c23 100644 --- a/src/UserGuide/Master/Tree/User-Manual/Data-Sync_apache.md +++ b/src/UserGuide/Master/Tree/User-Manual/Data-Sync_apache.md @@ -123,6 +123,16 @@ WITH SINK ( **IF NOT EXISTS semantics**: Used in creation operations to ensure that the create command is executed when the specified Pipe does not exist, preventing errors caused by attempting to create an existing Pipe. +**Note**: + +Starting from V2.0.8-beta, when creating a full data synchronization Pipe (e.g. Pipeid: `alldatapipe`), the system will automatically split it into two independent Pipes: + +* History Pipe: The PipeId is the original name plus the suffix `_history` (e.g. `alldatapipe_history`). The source parameter carries the default configurations: `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` +* Realtime Pipe: The PipeId is the original name plus the suffix `_realtime` (e.g. `alldatapipe_realtime`). The source parameter carries the default configuration: `'history.enable'='false'`. If metadata synchronization is configured, the Realtime Pipe will be responsible for sending the data. + +After successful creation, the original PipeId (e.g. `alldatapipe`) will no longer be a valid identifier. When performing task operations such as starting, stopping, deleting, or viewing, you must use the split independent PipeId (i.e. `*_history` or `*_realtime`). For operation examples, see the [View Task](./Data-Sync_apache.md#_2-5-view-task) section + + ### 2.2 Start Task Start processing data: @@ -186,6 +196,28 @@ The meanings of each column are as follows: - **RemainingEventCount (Statistics with Delay)**: The number of remaining events, which is the total count of all events in the current data synchronization task, including data and schema synchronization events, as well as system and user-defined events. - **EstimatedRemainingSeconds (Statistics with Delay)**: The estimated remaining time, based on the current number of events and the rate at the pipe, to complete the transfer. +Example: + +In V2.0.8-beta and later versions, create a full data synchronization task and view the task details. + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + ### 2.6 Synchronization Plugins To make the overall architecture more flexible to match different synchronization scenario requirements, we support plugin assembly within the synchronization task framework. The system comes with some pre-installed common plugins that you can use directly. At the same time, you can also customize processor plugins and Sink plugins, and load them into the IoTDB system for use. You can view the plugins in the system (including custom and built-in plugins) with the following statement: @@ -483,18 +515,18 @@ pipe_all_sinks_rate_limit_bytes_per_second=-1 ### 5.1 source parameter -| key | value | value range | required or not | default value | -| :------------------------------ | :----------------------------------------------------------- |:-----------------------------------------------------------------------| :------- | :------------- | -| source | iotdb-source | String: iotdb-source | Required | - | -| inclusion | Used to specify the range of data to be synchronized in the data synchronization task, including data, schema, and auth | String:all, data(insert,delete), schema(database,timeseries,ttl), auth | Optional | data.insert | -| inclusion.exclusion | Used to exclude specific operations from the range specified by inclusion, reducing the amount of data synchronized | String:all, data(insert,delete), schema(database,timeseries,ttl), auth | Optional | - | -| mode.streaming | Specifies the capture source for time-series data writes. Applicable when mode.streamingis false, determining the source for capturing data.insertspecified in inclusion. Offers two strategies:- true: ​​Dynamic capture selection.​​ The system adaptively chooses between capturing individual write requests or only TsFile sealing requests based on downstream processing speed. Prioritizes capturing write requests for lower latency when processing is fast; captures only file sealing requests to avoid backlog when slow. Suitable for most scenarios, balancing latency and throughput optimally.- false: ​​Fixed batch capture.​​ Captures only TsFile sealing requests. Suitable for resource-constrained scenarios to reduce system load. Note: The snapshot data captured upon pipe startup is only provided to downstream processing in file format. | Boolean: true / false | Optional | true | -| mode.strict | Determines the strictness when filtering data using time/ path/ database-name/ table-nameparameters:- true: ​​Strict filtering.​​ The system strictly filters captured data according to the given conditions, ensuring only matching data is selected.- false: ​​Non-strict filtering.​​ The system may include some extra data during filtering. Suitable for performance-sensitive scenarios to reduce CPU and I/O consumption. | Boolean: true / false | Optional | true | -| mode.snapshot | Determines the capture mode for time-series data, affecting the dataspecified in inclusion. Offers two modes:- true: ​​Static data capture.​​ Upon pipe startup, a one-time data snapshot is captured. ​​The pipe will automatically terminate (DROP PIPE SQL is executed automatically) after the snapshot data is fully consumed.​​- false: ​​Dynamic data capture.​​ In addition to capturing a snapshot upon startup, the pipe continuously captures subsequent data changes. The pipe runs continuously to handle the dynamic data stream. | Boolean: true / false | Optional | false | -| path | Can be specified when the user connects with sql_dialectset to tree. For upgraded user pipes, the default sql_dialectis tree. This parameter determines the capture scope for time-series data, affecting the dataspecified in inclusion, as well as some sequence-related metadata. Data is selected into the streaming pipe if its tree model path matches the specified path. | String: IoTDB-standard tree path pattern, wildcards allowed | Optional | root.** | -| start-time | The start event time for synchronizing all data, including start-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | Optional | Long.MIN_VALUE | -| end-time | The end event time for synchronizing all data, including end-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | Optional | Long.MAX_VALUE | -| forwarding-pipe-requests | Whether to forward data written by other Pipes (usually data synchronization) | Boolean: true /false | Optional | true | +| key | value | value range | required or not | default value | +| :------------------------------ |:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-----------------------------------------------------------------------| :------- | :------------- | +| source | iotdb-source | String: iotdb-source | Required | - | +| inclusion | Used to specify the range of data to be synchronized in the data synchronization task, including data, schema, and auth | String:all, data(insert,delete), schema(database,timeseries,ttl), auth | Optional | data.insert | +| inclusion.exclusion | Used to exclude specific operations from the range specified by inclusion, reducing the amount of data synchronized | String:all, data(insert,delete), schema(database,timeseries,ttl), auth | Optional | - | +| mode.streaming | Specifies the capture source for time-series data writes. Applicable when mode.streamingis false, determining the source for capturing data.insertspecified in inclusion. Offers two strategies:- true: ​​Dynamic capture selection.​​ The system adaptively chooses between capturing individual write requests or only TsFile sealing requests based on downstream processing speed. Prioritizes capturing write requests for lower latency when processing is fast; captures only file sealing requests to avoid backlog when slow. Suitable for most scenarios, balancing latency and throughput optimally.- false: ​​Fixed batch capture.​​ Captures only TsFile sealing requests. Suitable for resource-constrained scenarios to reduce system load. Note: The snapshot data captured upon pipe startup is only provided to downstream processing in file format. | Boolean: true / false | Optional | true | +| mode.strict | Determines the strictness when filtering data using time/ path/ database-name/ table-nameparameters:- true: ​​Strict filtering.​​ The system strictly filters captured data according to the given conditions, ensuring only matching data is selected.- false: ​​Non-strict filtering.​​ The system may include some extra data during filtering. Suitable for performance-sensitive scenarios to reduce CPU and I/O consumption. | Boolean: true / false | Optional | true | +| mode.snapshot | Determines the capture mode for time-series data, affecting the dataspecified in inclusion. Offers two modes:- true: ​​Static data capture.​​ Upon pipe startup, a one-time data snapshot is captured. ​​The pipe will automatically terminate (DROP PIPE SQL is executed automatically) after the snapshot data is fully consumed.​​- false: ​​Dynamic data capture.​​ In addition to capturing a snapshot upon startup, the pipe continuously captures subsequent data changes. The pipe runs continuously to handle the dynamic data stream. | Boolean: true / false | Optional | false | +| path | Can be specified when the user connects with sql_dialectset to tree. For upgraded user pipes, the default sql_dialectis tree. This parameter determines the capture scope for time-series data, affecting the dataspecified in inclusion, as well as some sequence-related metadata. Data is selected into the streaming pipe if its tree mode path matches the specified path. | String: IoTDB-standard tree path pattern, wildcards allowed | Optional | root.** | +| start-time | The start event time for synchronizing all data, including start-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | Optional | Long.MIN_VALUE | +| end-time | The end event time for synchronizing all data, including end-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | Optional | Long.MAX_VALUE | +| forwarding-pipe-requests | Whether to forward data written by other Pipes (usually data synchronization) | Boolean: true /false | Optional | true | | mods | Same as mods.enable, whether to send the MODS file for TSFile. | Boolean: true / false | Optional | false | > 💎 **Note:** The difference between the values of true and false for the data extraction mode `mode.streaming` diff --git a/src/UserGuide/Master/Tree/User-Manual/Data-Sync_timecho.md b/src/UserGuide/Master/Tree/User-Manual/Data-Sync_timecho.md index 3d492ed28..1653f64d8 100644 --- a/src/UserGuide/Master/Tree/User-Manual/Data-Sync_timecho.md +++ b/src/UserGuide/Master/Tree/User-Manual/Data-Sync_timecho.md @@ -125,6 +125,15 @@ WITH SINK ( **IF NOT EXISTS semantics**: Used in creation operations to ensure that the create command is executed when the specified Pipe does not exist, preventing errors caused by attempting to create an existing Pipe. +**Note**: + +Starting from V2.0.8, when creating a full data synchronization Pipe (e.g. Pipeid: `alldatapipe`), the system will automatically split it into two independent Pipes: + +* History Pipe: The PipeId is the original name plus the suffix `_history` (e.g. `alldatapipe_history`). The source parameter carries the default configurations: `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` +* Realtime Pipe: The PipeId is the original name plus the suffix `_realtime` (e.g. `alldatapipe_realtime`). The source parameter carries the default configuration: `'history.enable'='false'`. If metadata synchronization is configured, the Realtime Pipe will be responsible for sending the data. + +After successful creation, the original PipeId (e.g. `alldatapipe`) will no longer be a valid identifier. When performing task operations such as starting, stopping, deleting, or viewing, you must use the split independent PipeId (i.e. `*_history` or `*_realtime`). For operation examples, see the [View Task](./Data-Sync_timecho.md#_2-5-view-task) section + ### 2.2 Start Task Start processing data: @@ -188,6 +197,29 @@ The meanings of each column are as follows: - **RemainingEventCount (Statistics with Delay)**: The number of remaining events, which is the total count of all events in the current data synchronization task, including data and schema synchronization events, as well as system and user-defined events. - **EstimatedRemainingSeconds (Statistics with Delay)**: The estimated remaining time, based on the current number of events and the rate at the pipe, to complete the transfer. +Example: + +In V2.0.8 and later versions, create a full data synchronization task and view the task details. + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + + ### 2.6 Synchronization Plugins To make the overall architecture more flexible to match different synchronization scenario requirements, we support plugin assembly within the synchronization task framework. The system comes with some pre-installed common plugins that you can use directly. At the same time, you can also customize processor plugins and Sink plugins, and load them into the IoTDB system for use. You can view the plugins in the system (including custom and built-in plugins) with the following statement: @@ -574,10 +606,10 @@ pipe_all_sinks_rate_limit_bytes_per_second=-1 | source | iotdb-source | String: iotdb-source | Required | - | | inclusion | Used to specify the range of data to be synchronized in the data synchronization task, including data, schema, and auth | String:all, data(insert,delete), schema(database,timeseries,ttl), auth | Optional | data.insert | | inclusion.exclusion | Used to exclude specific operations from the range specified by inclusion, reducing the amount of data synchronized | String:all, data(insert,delete), schema(database,timeseries,ttl), auth | Optional | - | -| mode.streaming | Specifies the capture source for time-series data writes. Applicable when mode.streamingis false, determining the source for capturing data.insertspecified in inclusion. Offers two strategies:- true: ​​Dynamic capture selection.​​ The system adaptively chooses between capturing individual write requests or only TsFile sealing requests based on downstream processing speed. Prioritizes capturing write requests for lower latency when processing is fast; captures only file sealing requests to avoid backlog when slow. Suitable for most scenarios, balancing latency and throughput optimally.- false: ​​Fixed batch capture.​​ Captures only TsFile sealing requests. Suitable for resource-constrained scenarios to reduce system load. Note: The snapshot data captured upon pipe startup is only provided to downstream processing in file format. | Boolean: true / false | Optional | true | +| mode.streaming | Specifies the capture source for time-series data writes. Applicable when mode.streamingis false, determining the source for capturing data.insertspecified in inclusion. Offers two strategies:- true: ​​Dynamic capture selection.​​ The system adaptively chooses between capturing individual write requests or only TsFile sealing requests based on downstream processing speed. Prioritizes capturing write requests for lower latency when processing is fast; captures only file sealing requests to avoid backlog when slow. Suitable for most scenarios, balancing latency and throughput optimally.- false: ​​Fixed batch capture.​​ Captures only TsFile sealing requests. Suitable for resource-constrained scenarios to reduce system load. Note: The snapshot data captured upon pipe startup is only provided to downstream processing in file format. | Boolean: true / false |Optional | true | | mode.strict | Determines the strictness when filtering data using time/ path/ database-name/ table-nameparameters:- true: ​​Strict filtering.​​ The system strictly filters captured data according to the given conditions, ensuring only matching data is selected.- false: ​​Non-strict filtering.​​ The system may include some extra data during filtering. Suitable for performance-sensitive scenarios to reduce CPU and I/O consumption. | Boolean: true / false | Optional | true | | mode.snapshot | Determines the capture mode for time-series data, affecting the dataspecified in inclusion. Offers two modes:- true: ​​Static data capture.​​ Upon pipe startup, a one-time data snapshot is captured. ​​The pipe will automatically terminate (DROP PIPE SQL is executed automatically) after the snapshot data is fully consumed.​​- false: ​​Dynamic data capture.​​ In addition to capturing a snapshot upon startup, the pipe continuously captures subsequent data changes. The pipe runs continuously to handle the dynamic data stream. | Boolean: true / false | Optional | false | -| path | Can be specified when the user connects with sql_dialectset to tree. For upgraded user pipes, the default sql_dialectis tree. This parameter determines the capture scope for time-series data, affecting the dataspecified in inclusion, as well as some sequence-related metadata. Data is selected into the streaming pipe if its tree model path matches the specified path. | String: IoTDB-standard tree path pattern, wildcards allowed | Optional | root.** | +| path | Can be specified when the user connects with sql_dialectset to tree. For upgraded user pipes, the default sql_dialectis tree. This parameter determines the capture scope for time-series data, affecting the dataspecified in inclusion, as well as some sequence-related metadata. Data is selected into the streaming pipe if its tree mode path matches the specified path. | String: IoTDB-standard tree path pattern, wildcards allowed | Optional | root.** | | start-time | The start event time for synchronizing all data, including start-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | Optional | Long.MIN_VALUE | | end-time | The end event time for synchronizing all data, including end-time | Long: [Long.MIN_VALUE, Long.MAX_VALUE] | Optional | Long.MAX_VALUE | | forwarding-pipe-requests | Whether to forward data written by other Pipes (usually data synchronization) | Boolean: true / false | Optional | true | diff --git a/src/UserGuide/latest-Table/User-Manual/Data-Sync_apache.md b/src/UserGuide/latest-Table/User-Manual/Data-Sync_apache.md index 498597da6..fb859efa1 100644 --- a/src/UserGuide/latest-Table/User-Manual/Data-Sync_apache.md +++ b/src/UserGuide/latest-Table/User-Manual/Data-Sync_apache.md @@ -118,6 +118,15 @@ WITH SINK ( **IF NOT EXISTS Semantics**: Ensures that the creation command is executed only if the specified Pipe does not exist, preventing errors caused by attempting to create an already existing Pipe. +**Note**: + +Starting from V2.0.8-beta, when creating a full data synchronization Pipe (e.g. Pipeid: `alldatapipe`), the system will automatically split it into two independent Pipes: + +* History Pipe: The PipeId is the original name plus the suffix `_history` (e.g. `alldatapipe_history`). The source parameter carries the default configurations: `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` +* Realtime Pipe: The PipeId is the original name plus the suffix `_realtime` (e.g. `alldatapipe_realtime`). The source parameter carries the default configuration: `'history.enable'='false'`. If metadata synchronization is configured, the Realtime Pipe will be responsible for sending the data. + +After successful creation, the original PipeId (e.g. `alldatapipe`) will no longer be a valid identifier. When performing task operations such as starting, stopping, deleting, or viewing, you must use the split independent PipeId (i.e. `*_history` or `*_realtime`). For operation examples, see the [View Task](./Data-Sync_apache.md#_2-5-view-task) section + ### 2.2 Start a Task @@ -183,6 +192,29 @@ Example Output of `SHOW PIPES`: - **RemainingEventCount** (statistics may have delays): Number of remaining events, including data and metadata synchronization events, as well as system and user-defined events. - **EstimatedRemainingSeconds** (statistics may have delays): Estimated remaining time to complete the transmission based on the current event count and pipe processing rate. +Example: + +In V2.0.8-beta and later versions, create a full data synchronization task and view the task details. + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + + ### 2.6 Synchronization Plugins To make the architecture more flexible and adaptable to different synchronization scenarios, IoTDB supports plugin assembly in the synchronization task framework. The system provides some common pre-installed plugins, and you can also customize `processor` and `sink` plugins and load them into the IoTDB system. diff --git a/src/UserGuide/latest-Table/User-Manual/Data-Sync_timecho.md b/src/UserGuide/latest-Table/User-Manual/Data-Sync_timecho.md index afbdf1b3f..32bd1fafa 100644 --- a/src/UserGuide/latest-Table/User-Manual/Data-Sync_timecho.md +++ b/src/UserGuide/latest-Table/User-Manual/Data-Sync_timecho.md @@ -117,6 +117,15 @@ WITH SINK ( **IF NOT EXISTS Semantics**: Ensures that the creation command is executed only if the specified Pipe does not exist, preventing errors caused by attempting to create an already existing Pipe. +**Note**: + +Starting from V2.0.8, when creating a full data synchronization Pipe (e.g. Pipeid: `alldatapipe`), the system will automatically split it into two independent Pipes: + +* History Pipe: The PipeId is the original name plus the suffix `_history` (e.g. `alldatapipe_history`). The source parameter carries the default configurations: `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` +* Realtime Pipe: The PipeId is the original name plus the suffix `_realtime` (e.g. `alldatapipe_realtime`). The source parameter carries the default configuration: `'history.enable'='false'`. If metadata synchronization is configured, the Realtime Pipe will be responsible for sending the data. + +After successful creation, the original PipeId (e.g. `alldatapipe`) will no longer be a valid identifier. When performing task operations such as starting, stopping, deleting, or viewing, you must use the split independent PipeId (i.e. `*_history` or `*_realtime`). For operation examples, see the [View Task](./Data-Sync_timecho.md#_2-5-view-task) section + ### 2.2 Start a Task After creation, the task directly enters the RUNNING state and does not require manual startup. However, if the task is stopped using the `STOP PIPE` statement, you need to manually start it using the `START PIPE` statement. If the task stops due to an exception, it will automatically restart to resume data processing: @@ -181,6 +190,29 @@ Example Output of `SHOW PIPES`: - **RemainingEventCount** (statistics may have delays): Number of remaining events, including data and metadata synchronization events, as well as system and user-defined events. - **EstimatedRemainingSeconds** (statistics may have delays): Estimated remaining time to complete the transmission based on the current event count and pipe processing rate. +Example: + +In V2.0.8 and later versions, create a full data synchronization task and view the task details. + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + + ### 2.6 Synchronization Plugins To make the architecture more flexible and adaptable to different synchronization scenarios, IoTDB supports plugin assembly in the synchronization task framework. The system provides some common pre-installed plugins, and you can also customize `processor` and `sink` plugins and load them into the IoTDB system. diff --git a/src/UserGuide/latest/User-Manual/Data-Sync_apache.md b/src/UserGuide/latest/User-Manual/Data-Sync_apache.md index 2683ef7ec..513d46c23 100644 --- a/src/UserGuide/latest/User-Manual/Data-Sync_apache.md +++ b/src/UserGuide/latest/User-Manual/Data-Sync_apache.md @@ -123,6 +123,16 @@ WITH SINK ( **IF NOT EXISTS semantics**: Used in creation operations to ensure that the create command is executed when the specified Pipe does not exist, preventing errors caused by attempting to create an existing Pipe. +**Note**: + +Starting from V2.0.8-beta, when creating a full data synchronization Pipe (e.g. Pipeid: `alldatapipe`), the system will automatically split it into two independent Pipes: + +* History Pipe: The PipeId is the original name plus the suffix `_history` (e.g. `alldatapipe_history`). The source parameter carries the default configurations: `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` +* Realtime Pipe: The PipeId is the original name plus the suffix `_realtime` (e.g. `alldatapipe_realtime`). The source parameter carries the default configuration: `'history.enable'='false'`. If metadata synchronization is configured, the Realtime Pipe will be responsible for sending the data. + +After successful creation, the original PipeId (e.g. `alldatapipe`) will no longer be a valid identifier. When performing task operations such as starting, stopping, deleting, or viewing, you must use the split independent PipeId (i.e. `*_history` or `*_realtime`). For operation examples, see the [View Task](./Data-Sync_apache.md#_2-5-view-task) section + + ### 2.2 Start Task Start processing data: @@ -186,6 +196,28 @@ The meanings of each column are as follows: - **RemainingEventCount (Statistics with Delay)**: The number of remaining events, which is the total count of all events in the current data synchronization task, including data and schema synchronization events, as well as system and user-defined events. - **EstimatedRemainingSeconds (Statistics with Delay)**: The estimated remaining time, based on the current number of events and the rate at the pipe, to complete the transfer. +Example: + +In V2.0.8-beta and later versions, create a full data synchronization task and view the task details. + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + ### 2.6 Synchronization Plugins To make the overall architecture more flexible to match different synchronization scenario requirements, we support plugin assembly within the synchronization task framework. The system comes with some pre-installed common plugins that you can use directly. At the same time, you can also customize processor plugins and Sink plugins, and load them into the IoTDB system for use. You can view the plugins in the system (including custom and built-in plugins) with the following statement: diff --git a/src/UserGuide/latest/User-Manual/Data-Sync_timecho.md b/src/UserGuide/latest/User-Manual/Data-Sync_timecho.md index 770833400..1653f64d8 100644 --- a/src/UserGuide/latest/User-Manual/Data-Sync_timecho.md +++ b/src/UserGuide/latest/User-Manual/Data-Sync_timecho.md @@ -125,6 +125,15 @@ WITH SINK ( **IF NOT EXISTS semantics**: Used in creation operations to ensure that the create command is executed when the specified Pipe does not exist, preventing errors caused by attempting to create an existing Pipe. +**Note**: + +Starting from V2.0.8, when creating a full data synchronization Pipe (e.g. Pipeid: `alldatapipe`), the system will automatically split it into two independent Pipes: + +* History Pipe: The PipeId is the original name plus the suffix `_history` (e.g. `alldatapipe_history`). The source parameter carries the default configurations: `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` +* Realtime Pipe: The PipeId is the original name plus the suffix `_realtime` (e.g. `alldatapipe_realtime`). The source parameter carries the default configuration: `'history.enable'='false'`. If metadata synchronization is configured, the Realtime Pipe will be responsible for sending the data. + +After successful creation, the original PipeId (e.g. `alldatapipe`) will no longer be a valid identifier. When performing task operations such as starting, stopping, deleting, or viewing, you must use the split independent PipeId (i.e. `*_history` or `*_realtime`). For operation examples, see the [View Task](./Data-Sync_timecho.md#_2-5-view-task) section + ### 2.2 Start Task Start processing data: @@ -188,6 +197,29 @@ The meanings of each column are as follows: - **RemainingEventCount (Statistics with Delay)**: The number of remaining events, which is the total count of all events in the current data synchronization task, including data and schema synchronization events, as well as system and user-defined events. - **EstimatedRemainingSeconds (Statistics with Delay)**: The estimated remaining time, based on the current number of events and the rate at the pipe, to complete the transfer. +Example: + +In V2.0.8 and later versions, create a full data synchronization task and view the task details. + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + + ### 2.6 Synchronization Plugins To make the overall architecture more flexible to match different synchronization scenario requirements, we support plugin assembly within the synchronization task framework. The system comes with some pre-installed common plugins that you can use directly. At the same time, you can also customize processor plugins and Sink plugins, and load them into the IoTDB system for use. You can view the plugins in the system (including custom and built-in plugins) with the following statement: diff --git a/src/zh/UserGuide/Master/Table/User-Manual/Data-Sync_apache.md b/src/zh/UserGuide/Master/Table/User-Manual/Data-Sync_apache.md index 71ab04473..cb872116b 100644 --- a/src/zh/UserGuide/Master/Table/User-Manual/Data-Sync_apache.md +++ b/src/zh/UserGuide/Master/Table/User-Manual/Data-Sync_apache.md @@ -117,6 +117,15 @@ WITH SINK ( **IF NOT EXISTS 语义**:用于创建操作中,确保当指定 Pipe 不存在时,执行创建命令,防止因尝试创建已存在的 Pipe 而导致报错。 +**注意**:V2.0.8-beta 起,创建一个全量数据同步 Pipe (例如 Pipeid : `alldatapipe`)时,系统会自动将其拆分为两个独立的 Pipe: + +* 历史 Pipe:PipeId 为原名称加 _history后缀(如 `alldatapipe_history`),source 参数默认携带 `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` + +* 实时 Pipe:PipeId 为原名称加 _realtime后缀(如 `alldatapipe_realtime`),source 参数默认携带 `'history.enable'='false'` ,若配置了元数据同步,则由实时 Pipe 负责发送 + +创建成功后,原 PipeId(如 `alldatapipe`)将不再作为有效标识符。在进行启动、停止、删除、查看等任务操作时,必须使用拆分后的独立 PipeId(即 `*_history`或 `*_realtime`)。操作示例见[查看任务](./Data-Sync_apache.md#_2-5-查看任务)小节 + + ### 2.2 开始任务 创建之后,任务直接进入运行状态,不需要执行启动任务。当使用`STOP PIPE`语句停止任务时需手动使用`START PIPE`语句来启动任务,PIPE 发生异常情况停止后会自动重新启动任务,从而开始处理数据: @@ -181,6 +190,29 @@ SHOW PIPE - **RemainingEventCount(统计存在延迟)**:剩余 event 数,当前数据同步任务中的所有 event 总数,包括数据同步的 event,以及系统和用户自定义的 event。 - **EstimatedRemainingSeconds(统计存在延迟)**:剩余时间,基于当前 event 个数和 pipe 处速率,预估完成传输的剩余时间。 +示例: + +在 V2.0.8-beta 及之后的版本中,创建一个全量数据同步任务,并查看该任务详情 + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + + ### 2.6 同步插件 为了使得整体架构更加灵活以匹配不同的同步场景需求,我们支持在同步任务框架中进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 processor 插件 和 Sink 插件,并加载至 IoTDB 系统进行使用。查看系统中的插件(含自定义与内置插件)可以用以下语句: diff --git a/src/zh/UserGuide/Master/Table/User-Manual/Data-Sync_timecho.md b/src/zh/UserGuide/Master/Table/User-Manual/Data-Sync_timecho.md index 0de1d8354..f8e8f1624 100644 --- a/src/zh/UserGuide/Master/Table/User-Manual/Data-Sync_timecho.md +++ b/src/zh/UserGuide/Master/Table/User-Manual/Data-Sync_timecho.md @@ -117,6 +117,14 @@ WITH SINK ( **IF NOT EXISTS 语义**:用于创建操作中,确保当指定 Pipe 不存在时,执行创建命令,防止因尝试创建已存在的 Pipe 而导致报错。 +**注意**:V2.0.8 起,创建一个全量数据同步 Pipe (例如 Pipeid : `alldatapipe`)时,系统会自动将其拆分为两个独立的 Pipe: + +* 历史 Pipe:PipeId 为原名称加 _history后缀(如 `alldatapipe_history`),source 参数默认携带 `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` + +* 实时 Pipe:PipeId 为原名称加 _realtime后缀(如 `alldatapipe_realtime`),source 参数默认携带 `'history.enable'='false'` ,若配置了元数据同步,则由实时 Pipe 负责发送 + +创建成功后,原 PipeId(如 `alldatapipe`)将不再作为有效标识符。在进行启动、停止、删除、查看等任务操作时,必须使用拆分后的独立 PipeId(即 `*_history`或 `*_realtime`)。操作示例见[查看任务](./Data-Sync_timecho.md#_2-5-查看任务)小节 + ### 2.2 开始任务 创建之后,任务直接进入运行状态,不需要执行启动任务。当使用`STOP PIPE`语句停止任务时需手动使用`START PIPE`语句来启动任务,PIPE 发生异常情况停止后会自动重新启动任务,从而开始处理数据: @@ -181,6 +189,28 @@ SHOW PIPE - **RemainingEventCount(统计存在延迟)**:剩余 event 数,当前数据同步任务中的所有 event 总数,包括数据同步的 event,以及系统和用户自定义的 event。 - **EstimatedRemainingSeconds(统计存在延迟)**:剩余时间,基于当前 event 个数和 pipe 处速率,预估完成传输的剩余时间。 +示例: + +在 V2.0.8 及之后的版本中,创建一个全量数据同步任务,并查看该任务详情 + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + ### 2.6 同步插件 为了使得整体架构更加灵活以匹配不同的同步场景需求,我们支持在同步任务框架中进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 processor 插件 和 Sink 插件,并加载至 IoTDB 系统进行使用。查看系统中的插件(含自定义与内置插件)可以用以下语句: diff --git a/src/zh/UserGuide/Master/Tree/User-Manual/Data-Sync_apache.md b/src/zh/UserGuide/Master/Tree/User-Manual/Data-Sync_apache.md index 3263023ad..ef18a7118 100644 --- a/src/zh/UserGuide/Master/Tree/User-Manual/Data-Sync_apache.md +++ b/src/zh/UserGuide/Master/Tree/User-Manual/Data-Sync_apache.md @@ -122,6 +122,15 @@ WITH SINK ( **IF NOT EXISTS 语义**:用于创建操作中,确保当指定 Pipe 不存在时,执行创建命令,防止因尝试创建已存在的 Pipe 而导致报错。 +**注意**:V2.0.8-beta 起,创建一个全量数据同步 Pipe (例如 Pipeid : `alldatapipe`)时,系统会自动将其拆分为两个独立的 Pipe: + +* 历史 Pipe:PipeId 为原名称加 _history后缀(如 `alldatapipe_history`),source 参数默认携带 `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` + +* 实时 Pipe:PipeId 为原名称加 _realtime后缀(如 `alldatapipe_realtime`),source 参数默认携带 `'history.enable'='false'` ,若配置了元数据同步,则由实时 Pipe 负责发送 + +创建成功后,原 PipeId(如 `alldatapipe`)将不再作为有效标识符。在进行启动、停止、删除、查看等任务操作时,必须使用拆分后的独立 PipeId(即 `*_history`或 `*_realtime`)。操作示例见[查看任务](./Data-Sync_apache.md#_2-5-查看任务)小节 + + ### 2.2 开始任务 开始处理数据: @@ -187,6 +196,29 @@ SHOW PIPE - **RemainingEventCount(统计存在延迟)**:剩余 event 数,当前数据同步任务中的所有 event 总数,包括数据和元数据同步的 event,以及系统和用户自定义的 event。 - **EstimatedRemainingSeconds(统计存在延迟)**:剩余时间,基于当前 event 个数和 pipe 处速率,预估完成传输的剩余时间。 +示例: + +在 V2.0.8-beta 及之后的版本中,创建一个全量数据同步任务,并查看该任务详情 + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + + ### 2.6 同步插件 为了使得整体架构更加灵活以匹配不同的同步场景需求,我们支持在同步任务框架中进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 processor 插件 和 Sink 插件,并加载至 IoTDB 系统进行使用。查看系统中的插件(含自定义与内置插件)可以用以下语句: diff --git a/src/zh/UserGuide/Master/Tree/User-Manual/Data-Sync_timecho.md b/src/zh/UserGuide/Master/Tree/User-Manual/Data-Sync_timecho.md index ad7b7524b..8ef189055 100644 --- a/src/zh/UserGuide/Master/Tree/User-Manual/Data-Sync_timecho.md +++ b/src/zh/UserGuide/Master/Tree/User-Manual/Data-Sync_timecho.md @@ -124,6 +124,15 @@ WITH SINK ( **IF NOT EXISTS 语义**:用于创建操作中,确保当指定 Pipe 不存在时,执行创建命令,防止因尝试创建已存在的 Pipe 而导致报错。 +**注意**:V2.0.8 起,创建一个全量数据同步 Pipe (例如 Pipeid : `alldatapipe`)时,系统会自动将其拆分为两个独立的 Pipe: + +* 历史 Pipe:PipeId 为原名称加 _history后缀(如 `alldatapipe_history`),source 参数默认携带 `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` + +* 实时 Pipe:PipeId 为原名称加 _realtime后缀(如 `alldatapipe_realtime`),source 参数默认携带 `'history.enable'='false'` ,若配置了元数据同步,则由实时 Pipe 负责发送 + +创建成功后,原 PipeId(如 `alldatapipe`)将不再作为有效标识符。在进行启动、停止、删除、查看等任务操作时,必须使用拆分后的独立 PipeId(即 `*_history`或 `*_realtime`)。操作示例见[查看任务](./Data-Sync_timecho.md#_2-5-查看任务)小节 + + ### 2.2 开始任务 开始处理数据: @@ -188,6 +197,29 @@ SHOW PIPE - **RemainingEventCount(统计存在延迟)**:剩余 event 数,当前数据同步任务中的所有 event 总数,包括数据和元数据同步的 event,以及系统和用户自定义的 event。 - **EstimatedRemainingSeconds(统计存在延迟)**:剩余时间,基于当前 event 个数和 pipe 处速率,预估完成传输的剩余时间。 +示例: + +在 V2.0.8 及之后的版本中,创建一个全量数据同步任务,并查看该任务详情 + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + + ### 2.6 同步插件 为了使得整体架构更加灵活以匹配不同的同步场景需求,我们支持在同步任务框架中进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 processor 插件 和 Sink 插件,并加载至 IoTDB 系统进行使用。查看系统中的插件(含自定义与内置插件)可以用以下语句: diff --git a/src/zh/UserGuide/latest-Table/User-Manual/Data-Sync_apache.md b/src/zh/UserGuide/latest-Table/User-Manual/Data-Sync_apache.md index 71ab04473..cb872116b 100644 --- a/src/zh/UserGuide/latest-Table/User-Manual/Data-Sync_apache.md +++ b/src/zh/UserGuide/latest-Table/User-Manual/Data-Sync_apache.md @@ -117,6 +117,15 @@ WITH SINK ( **IF NOT EXISTS 语义**:用于创建操作中,确保当指定 Pipe 不存在时,执行创建命令,防止因尝试创建已存在的 Pipe 而导致报错。 +**注意**:V2.0.8-beta 起,创建一个全量数据同步 Pipe (例如 Pipeid : `alldatapipe`)时,系统会自动将其拆分为两个独立的 Pipe: + +* 历史 Pipe:PipeId 为原名称加 _history后缀(如 `alldatapipe_history`),source 参数默认携带 `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` + +* 实时 Pipe:PipeId 为原名称加 _realtime后缀(如 `alldatapipe_realtime`),source 参数默认携带 `'history.enable'='false'` ,若配置了元数据同步,则由实时 Pipe 负责发送 + +创建成功后,原 PipeId(如 `alldatapipe`)将不再作为有效标识符。在进行启动、停止、删除、查看等任务操作时,必须使用拆分后的独立 PipeId(即 `*_history`或 `*_realtime`)。操作示例见[查看任务](./Data-Sync_apache.md#_2-5-查看任务)小节 + + ### 2.2 开始任务 创建之后,任务直接进入运行状态,不需要执行启动任务。当使用`STOP PIPE`语句停止任务时需手动使用`START PIPE`语句来启动任务,PIPE 发生异常情况停止后会自动重新启动任务,从而开始处理数据: @@ -181,6 +190,29 @@ SHOW PIPE - **RemainingEventCount(统计存在延迟)**:剩余 event 数,当前数据同步任务中的所有 event 总数,包括数据同步的 event,以及系统和用户自定义的 event。 - **EstimatedRemainingSeconds(统计存在延迟)**:剩余时间,基于当前 event 个数和 pipe 处速率,预估完成传输的剩余时间。 +示例: + +在 V2.0.8-beta 及之后的版本中,创建一个全量数据同步任务,并查看该任务详情 + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + + ### 2.6 同步插件 为了使得整体架构更加灵活以匹配不同的同步场景需求,我们支持在同步任务框架中进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 processor 插件 和 Sink 插件,并加载至 IoTDB 系统进行使用。查看系统中的插件(含自定义与内置插件)可以用以下语句: diff --git a/src/zh/UserGuide/latest-Table/User-Manual/Data-Sync_timecho.md b/src/zh/UserGuide/latest-Table/User-Manual/Data-Sync_timecho.md index 0de1d8354..f8e8f1624 100644 --- a/src/zh/UserGuide/latest-Table/User-Manual/Data-Sync_timecho.md +++ b/src/zh/UserGuide/latest-Table/User-Manual/Data-Sync_timecho.md @@ -117,6 +117,14 @@ WITH SINK ( **IF NOT EXISTS 语义**:用于创建操作中,确保当指定 Pipe 不存在时,执行创建命令,防止因尝试创建已存在的 Pipe 而导致报错。 +**注意**:V2.0.8 起,创建一个全量数据同步 Pipe (例如 Pipeid : `alldatapipe`)时,系统会自动将其拆分为两个独立的 Pipe: + +* 历史 Pipe:PipeId 为原名称加 _history后缀(如 `alldatapipe_history`),source 参数默认携带 `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` + +* 实时 Pipe:PipeId 为原名称加 _realtime后缀(如 `alldatapipe_realtime`),source 参数默认携带 `'history.enable'='false'` ,若配置了元数据同步,则由实时 Pipe 负责发送 + +创建成功后,原 PipeId(如 `alldatapipe`)将不再作为有效标识符。在进行启动、停止、删除、查看等任务操作时,必须使用拆分后的独立 PipeId(即 `*_history`或 `*_realtime`)。操作示例见[查看任务](./Data-Sync_timecho.md#_2-5-查看任务)小节 + ### 2.2 开始任务 创建之后,任务直接进入运行状态,不需要执行启动任务。当使用`STOP PIPE`语句停止任务时需手动使用`START PIPE`语句来启动任务,PIPE 发生异常情况停止后会自动重新启动任务,从而开始处理数据: @@ -181,6 +189,28 @@ SHOW PIPE - **RemainingEventCount(统计存在延迟)**:剩余 event 数,当前数据同步任务中的所有 event 总数,包括数据同步的 event,以及系统和用户自定义的 event。 - **EstimatedRemainingSeconds(统计存在延迟)**:剩余时间,基于当前 event 个数和 pipe 处速率,预估完成传输的剩余时间。 +示例: + +在 V2.0.8 及之后的版本中,创建一个全量数据同步任务,并查看该任务详情 + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + ### 2.6 同步插件 为了使得整体架构更加灵活以匹配不同的同步场景需求,我们支持在同步任务框架中进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 processor 插件 和 Sink 插件,并加载至 IoTDB 系统进行使用。查看系统中的插件(含自定义与内置插件)可以用以下语句: diff --git a/src/zh/UserGuide/latest/User-Manual/Data-Sync_apache.md b/src/zh/UserGuide/latest/User-Manual/Data-Sync_apache.md index 3263023ad..ef18a7118 100644 --- a/src/zh/UserGuide/latest/User-Manual/Data-Sync_apache.md +++ b/src/zh/UserGuide/latest/User-Manual/Data-Sync_apache.md @@ -122,6 +122,15 @@ WITH SINK ( **IF NOT EXISTS 语义**:用于创建操作中,确保当指定 Pipe 不存在时,执行创建命令,防止因尝试创建已存在的 Pipe 而导致报错。 +**注意**:V2.0.8-beta 起,创建一个全量数据同步 Pipe (例如 Pipeid : `alldatapipe`)时,系统会自动将其拆分为两个独立的 Pipe: + +* 历史 Pipe:PipeId 为原名称加 _history后缀(如 `alldatapipe_history`),source 参数默认携带 `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` + +* 实时 Pipe:PipeId 为原名称加 _realtime后缀(如 `alldatapipe_realtime`),source 参数默认携带 `'history.enable'='false'` ,若配置了元数据同步,则由实时 Pipe 负责发送 + +创建成功后,原 PipeId(如 `alldatapipe`)将不再作为有效标识符。在进行启动、停止、删除、查看等任务操作时,必须使用拆分后的独立 PipeId(即 `*_history`或 `*_realtime`)。操作示例见[查看任务](./Data-Sync_apache.md#_2-5-查看任务)小节 + + ### 2.2 开始任务 开始处理数据: @@ -187,6 +196,29 @@ SHOW PIPE - **RemainingEventCount(统计存在延迟)**:剩余 event 数,当前数据同步任务中的所有 event 总数,包括数据和元数据同步的 event,以及系统和用户自定义的 event。 - **EstimatedRemainingSeconds(统计存在延迟)**:剩余时间,基于当前 event 个数和 pipe 处速率,预估完成传输的剩余时间。 +示例: + +在 V2.0.8-beta 及之后的版本中,创建一个全量数据同步任务,并查看该任务详情 + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + + ### 2.6 同步插件 为了使得整体架构更加灵活以匹配不同的同步场景需求,我们支持在同步任务框架中进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 processor 插件 和 Sink 插件,并加载至 IoTDB 系统进行使用。查看系统中的插件(含自定义与内置插件)可以用以下语句: diff --git a/src/zh/UserGuide/latest/User-Manual/Data-Sync_timecho.md b/src/zh/UserGuide/latest/User-Manual/Data-Sync_timecho.md index ad7b7524b..8ef189055 100644 --- a/src/zh/UserGuide/latest/User-Manual/Data-Sync_timecho.md +++ b/src/zh/UserGuide/latest/User-Manual/Data-Sync_timecho.md @@ -124,6 +124,15 @@ WITH SINK ( **IF NOT EXISTS 语义**:用于创建操作中,确保当指定 Pipe 不存在时,执行创建命令,防止因尝试创建已存在的 Pipe 而导致报错。 +**注意**:V2.0.8 起,创建一个全量数据同步 Pipe (例如 Pipeid : `alldatapipe`)时,系统会自动将其拆分为两个独立的 Pipe: + +* 历史 Pipe:PipeId 为原名称加 _history后缀(如 `alldatapipe_history`),source 参数默认携带 `'realtime.enable'='false', 'inclusion'='data.insert', 'inclusion.exclusion'=''` + +* 实时 Pipe:PipeId 为原名称加 _realtime后缀(如 `alldatapipe_realtime`),source 参数默认携带 `'history.enable'='false'` ,若配置了元数据同步,则由实时 Pipe 负责发送 + +创建成功后,原 PipeId(如 `alldatapipe`)将不再作为有效标识符。在进行启动、停止、删除、查看等任务操作时,必须使用拆分后的独立 PipeId(即 `*_history`或 `*_realtime`)。操作示例见[查看任务](./Data-Sync_timecho.md#_2-5-查看任务)小节 + + ### 2.2 开始任务 开始处理数据: @@ -188,6 +197,29 @@ SHOW PIPE - **RemainingEventCount(统计存在延迟)**:剩余 event 数,当前数据同步任务中的所有 event 总数,包括数据和元数据同步的 event,以及系统和用户自定义的 event。 - **EstimatedRemainingSeconds(统计存在延迟)**:剩余时间,基于当前 event 个数和 pipe 处速率,预估完成传输的剩余时间。 +示例: + +在 V2.0.8 及之后的版本中,创建一个全量数据同步任务,并查看该任务详情 + +```sql +IoTDB> create pipe alldatapipe with source('inclusion'='all','exclusion'='auth') with sink('node-urls'='127.0.0.1:6668') + +IoTDB> show pipe alldatapipe_history ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_history|2025-12-18T15:06:16.697|RUNNING|{exclusion=auth, history.enable=true, inclusion=data.insert, inclusion.exclusion=, realtime.enable=false}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++-------------------+-----------------------+-------+---------------------------------------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ + +IoTDB> show pipe alldatapipe_realtime ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +| ID| CreationTime| State| PipeSource|PipeProcessor| PipeSink|ExceptionMessage|RemainingEventCount|EstimatedRemainingSeconds| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +|alldatapipe_realtime|2025-12-18T15:06:16.312|RUNNING|{exclusion=auth, history.enable=false, inclusion=all, realtime.enable=true}| {}|{node-urls=127.0.0.1:6668}| | 0| 0.00| ++--------------------+-----------------------+-------+---------------------------------------------------------------------------+-------------+--------------------------+----------------+-------------------+-------------------------+ +``` + + ### 2.6 同步插件 为了使得整体架构更加灵活以匹配不同的同步场景需求,我们支持在同步任务框架中进行插件组装。系统为您预置了一些常用插件可直接使用,同时您也可以自定义 processor 插件 和 Sink 插件,并加载至 IoTDB 系统进行使用。查看系统中的插件(含自定义与内置插件)可以用以下语句: