Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 122 additions & 2 deletions cmd/cdc/cli/cli_changefeed_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
package cli

import (
"bytes"
"encoding/json"

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
v2 "github.com/pingcap/ticdc/api/v2"
"github.com/pingcap/ticdc/cmd/cdc/factory"
Expand All @@ -25,7 +29,7 @@ import (
"github.com/spf13/cobra"
)

// cfMeta holds changefeed info and changefeed status.
// cfMeta holds changefeed info and changefeed status for JSON output.
type cfMeta struct {
UpstreamID uint64 `json:"upstream_id"`
ID string `json:"id"`
Expand All @@ -46,17 +50,54 @@ type cfMeta struct {
TaskStatus []config.CaptureTaskStatus `json:"task_status,omitempty"`
}

// cfMetaSimplifiedTOML holds simplified changefeed info for TOML output.
type cfMetaSimplifiedTOML struct {
UpstreamID uint64 `toml:"upstream-id"`
ID string `toml:"id"`
Keyspace string `toml:"keyspace"`
FeedState config.FeedState `toml:"state"`
CheckpointTSO uint64 `toml:"checkpoint-tso"`
CheckpointTime api.JSONTime `toml:"checkpoint-time"`
RunningError *config.RunningError `toml:"error,omitempty"`
}

// cfMetaTOML holds changefeed info for TOML output. Config is interface{}
// because it holds a map[string]interface{} produced by BurntSushi/toml
// encoding (which correctly handles time.Duration as human-readable strings).
type cfMetaTOML struct {
UpstreamID uint64 `toml:"upstream-id"`
ID string `toml:"id"`
Keyspace string `toml:"keyspace"`
SinkURI string `toml:"sink-uri"`
Config interface{} `toml:"config"`
CreateTime api.JSONTime `toml:"create-time"`
StartTs uint64 `toml:"start-ts"`
ResolvedTs uint64 `toml:"resolved-ts"`
TargetTs uint64 `toml:"target-ts"`
CheckpointTSO uint64 `toml:"checkpoint-tso"`
CheckpointTime api.JSONTime `toml:"checkpoint-time"`
Engine config.SortEngine `toml:"sort-engine,omitempty"`
FeedState config.FeedState `toml:"state"`
RunningError *config.RunningError `toml:"error,omitempty"`
ErrorHis []int64 `toml:"error-history,omitempty"`
CreatorVersion string `toml:"creator-version"`
TaskStatus []config.CaptureTaskStatus `toml:"task-status,omitempty"`
}

// queryChangefeedOptions defines flags for the `cli changefeed query` command.
type queryChangefeedOptions struct {
apiClientV2 apiv2client.APIV2Interface
changefeedID string
simplified bool
keyspace string
outputFormat string
}

// newQueryChangefeedOptions creates new options for the `cli changefeed query` command.
func newQueryChangefeedOptions() *queryChangefeedOptions {
return &queryChangefeedOptions{}
return &queryChangefeedOptions{
outputFormat: "json",
}
}

// addFlags receives a *cobra.Command reference and binds
Expand All @@ -65,6 +106,7 @@ func (o *queryChangefeedOptions) addFlags(cmd *cobra.Command) {
cmd.PersistentFlags().StringVarP(&o.keyspace, "keyspace", "k", "default", "Replication task (changefeed) Keyspace")
cmd.PersistentFlags().BoolVarP(&o.simplified, "simple", "s", false, "Output simplified replication status")
cmd.PersistentFlags().StringVarP(&o.changefeedID, "changefeed-id", "c", "", "Replication task (changefeed) ID")
cmd.PersistentFlags().StringVarP(&o.outputFormat, "output", "o", "json", "Output format (json|toml)")
_ = cmd.MarkPersistentFlagRequired("changefeed-id")
}

Expand All @@ -81,13 +123,31 @@ func (o *queryChangefeedOptions) complete(f factory.Factory) error {
// run the `cli changefeed query` command.
func (o *queryChangefeedOptions) run(cmd *cobra.Command) error {
ctx := cmd.Context()

format, err := util.ParseOutputFormat(o.outputFormat)
if err != nil {
return err
}

if o.simplified {
infos, err := o.apiClientV2.Changefeeds().List(ctx, o.keyspace, "all")
if err != nil {
return errors.Trace(err)
}
for _, info := range infos {
if info.ID == o.changefeedID {
if format == util.OutputFormatTOML {
simplified := &cfMetaSimplifiedTOML{
UpstreamID: info.UpstreamID,
ID: info.ID,
Keyspace: info.Keyspace,
FeedState: info.FeedState,
CheckpointTSO: info.CheckpointTSO,
CheckpointTime: info.CheckpointTime,
RunningError: info.RunningError,
}
return util.TOMLPrint(cmd, simplified)
}
return util.JSONPrint(cmd, info)
}
}
Expand All @@ -98,6 +158,32 @@ func (o *queryChangefeedOptions) run(cmd *cobra.Command) error {
if err != nil {
return err
}

if format == util.OutputFormatTOML {
cfgMap, err := configToMap(detail.Config, format)
if err != nil {
return errors.Annotate(err, "marshal changefeed config")
}
meta := &cfMetaTOML{
UpstreamID: detail.UpstreamID,
ID: detail.ID,
Keyspace: detail.Keyspace,
SinkURI: detail.SinkURI,
Config: cfgMap,
CreateTime: api.JSONTime(detail.CreateTime),
StartTs: detail.StartTs,
ResolvedTs: detail.ResolvedTs,
TargetTs: detail.TargetTs,
CheckpointTSO: detail.CheckpointTs,
CheckpointTime: detail.CheckpointTime,
FeedState: detail.State,
RunningError: detail.Error,
CreatorVersion: detail.CreatorVersion,
TaskStatus: detail.TaskStatus,
}
return util.TOMLPrint(cmd, meta)
}

meta := &cfMeta{
UpstreamID: detail.UpstreamID,
ID: detail.ID,
Expand Down Expand Up @@ -136,3 +222,37 @@ func newCmdQueryChangefeed(f factory.Factory) *cobra.Command {

return command
}

// configToMap serializes a ReplicaConfig to a map[string]interface{} using
// either JSON or TOML encoding. The encoding determines the map's key naming
// convention: JSON produces snake_case keys, TOML produces kebab-case keys.
//
// The TOML path converts through the internal config.ReplicaConfig (which has
// toml struct tags) and encodes with BurntSushi/toml, keeping TOML-specific
// concerns out of the API model.
func configToMap(cfg *v2.ReplicaConfig, format util.OutputFormat) (map[string]interface{}, error) {
if cfg == nil {
return nil, nil
}
if format == util.OutputFormatTOML {
internalCfg := cfg.ToInternalReplicaConfig()
var buf bytes.Buffer
if err := toml.NewEncoder(&buf).Encode(internalCfg); err != nil {
return nil, errors.Trace(err)
}
var m map[string]interface{}
if _, err := toml.NewDecoder(&buf).Decode(&m); err != nil {
return nil, errors.Trace(err)
}
return m, nil
}
data, err := json.Marshal(cfg)
if err != nil {
return nil, errors.Trace(err)
}
var m map[string]interface{}
if err := json.Unmarshal(data, &m); err != nil {
return nil, errors.Trace(err)
}
return m, nil
}
Loading