Skip to content

influxdb: Add ability to specify tags#467

Open
BrianJKoopman wants to merge 10 commits intomainfrom
koopman/influxdb-tagging
Open

influxdb: Add ability to specify tags#467
BrianJKoopman wants to merge 10 commits intomainfrom
koopman/influxdb-tagging

Conversation

@BrianJKoopman
Copy link
Copy Markdown
Member

@BrianJKoopman BrianJKoopman commented Apr 7, 2026

Description

This PR (built on top of the refactor in #466) adds the ability to specify tags for each field published to InfluxDB by including the new 'influxdb_tags' dict in the message published on an OCS Feed.

The new tagging structure is only applied for agents that publish this new 'influxdb_tags' dict. Messages with this dict will now look like:

    message = {
        'block_name': <Key to identify group of co-sampled data>
        'timestamps': [ctime1, ctime2, ... ]
        'influxdb_tags': {
            'field_name_1': {'tag1': tag1_1, 'tag2': tag1_2, '_field': 'value'},
            'field_name_2': {'tag1': tag2_1, 'tag2': tag2_2, '_field': 'value'}
        }
        'data': {
            'field_name_1': [data1_1, data1_2, ...],
            'field_name_2': [data2_1, data2_2, ...]
        }
    }

Each 'tag' provided for the (ocs) fields will be applied, the data will be published to a measurement set by the 'agent_class' and the (influx) field will be the string provided in '_field'.

With the new tags, this changes the structure of a write from something like:

['observatory.fake-data1,feed=false_temperatures channel_00=0.20307 1775502374078489088',
 'observatory.fake-data1,feed=false_temperatures channel_01=0.35795 1775502374078489088',
 'observatory.fake-data1,feed=false_temperatures channel_00=0.20548 1775502375078489088',
 'observatory.fake-data1,feed=false_temperatures channel_01=0.36313 1775502375078489088']

to:

['FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=0 temperature=0.20307 1775502374078489088',
 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=1 temperature=0.35795 1775502374078489088',
 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=0 temperature=0.20548 1775502375078489088',
 'FakeDataAgent,feed=false_temperatures,address_root=observatory,instance_id=fake-data1,channel=1 temperature=0.36313 1775502375078489088']

Motivation and Context

Resolves #175.

Some agents in socs have a large number of fields and writing Grafana dashboards to query all of their fields results in a large number of queries to the InfluxDB. Tagging, as performed here, should enable writing more efficient queries.

The old structure in OCS was violating essentially every suggestion for schema and data layout made by InfluxDB. We were encoding data in both the measurements and the field keys. This new structure fixes both of these issues.

How Has This Been Tested?

I have run the agent locally in both 'json' and 'line' protocol modes and verified how the data is presented in InfluxDB through Grafana while running the modified FakeDataAgent.

This results in being able to make queries like this:
image

I also ran an unmodified FakeDataAgent alongside the modified one. While writing to InfluxDB I also wrote to .g3 file via the Aggregator agent.

I'm not 100% sure, so checking me on this would be great in this review, but I don't see the influxdb_tags making it into the .g3 files:

$ spt3g-dump 1775508288.g3
Frame (Housekeeping) [
"description" (spt3g.core.G3String) => "HK data"
"hkagg_type" (spt3g.core.G3Int) => 0
"hkagg_version" (spt3g.core.G3Int) => 2
"session_id" (spt3g.core.G3Int) => 476608725980044724
"start_time" (spt3g.core.G3Double) => 1.77551e+09
]
Frame (Housekeeping) [
"hkagg_type" (spt3g.core.G3Int) => 1
"hkagg_version" (spt3g.core.G3Int) => 2
"providers" (spt3g.core.G3VectorFrameObject) => [0x561e443ba4c0]
"session_id" (spt3g.core.G3Int) => 476608725980044724
"timestamp" (spt3g.core.G3Double) => 1.77551e+09
]
Frame (Housekeeping) [
"address" (spt3g.core.G3String) => "observatory.fake-data1.feeds.false_temperatures"
"block_names" (spt3g.core.G3VectorString) => [temps]
"blocks" (spt3g.core.G3VectorFrameObject) => [0x561e443a7820]
"hkagg_type" (spt3g.core.G3Int) => 2
"hkagg_version" (spt3g.core.G3Int) => 2
"prov_id" (spt3g.core.G3Int) => 1
"provider_session_id" (spt3g.core.G3String) => "1775505864.4119794"
"session_id" (spt3g.core.G3Int) => 476608725980044724
"timestamp" (spt3g.core.G3Double) => 1.77551e+09
]
Frame (Housekeeping) [
"address" (spt3g.core.G3String) => "observatory.fake-data1.feeds.false_temperatures"
"block_names" (spt3g.core.G3VectorString) => [temps]
"blocks" (spt3g.core.G3VectorFrameObject) => [0x561e443dea00]
"hkagg_type" (spt3g.core.G3Int) => 2
"hkagg_version" (spt3g.core.G3Int) => 2
"prov_id" (spt3g.core.G3Int) => 1
"provider_session_id" (spt3g.core.G3String) => "1775505864.4119794"
"session_id" (spt3g.core.G3Int) => 476608725980044724
"timestamp" (spt3g.core.G3Double) => 1.77551e+09
]
Frame (Housekeeping) [
"address" (spt3g.core.G3String) => "observatory.fake-data1.feeds.false_temperatures"
"block_names" (spt3g.core.G3VectorString) => [temps]
"blocks" (spt3g.core.G3VectorFrameObject) => [0x561e443f4470]
"hkagg_type" (spt3g.core.G3Int) => 2
"hkagg_version" (spt3g.core.G3Int) => 2
"prov_id" (spt3g.core.G3Int) => 1
"provider_session_id" (spt3g.core.G3String) => "1775505864.4119794"
"session_id" (spt3g.core.G3Int) => 476608725980044724
"timestamp" (spt3g.core.G3Double) => 1.77551e+09
]

Trying to look more closely:

$ python
Python 3.11.9 (main, Jul 29 2024, 17:04:41) [GCC 14.1.1 20240720] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from so3g import hk
>>> scanner = hk.HKArchiveScanner()
>>> import glob
>>> files = glob.glob('*.g3')
>>> files
['1775508987.g3', '1775503131.g3', '1775508113.g3', '1775500288.g3', '1775509110.g3', '1775505088.g3', '1775500466.g3', '1775501885.g3', '1775504376.g3', '1775507755.g3', '1775500111.g3', '1775507576.g3', '1775503488.g3', '1775500999.g3', '1775507399.g3', '1775508664.g3', '1775506339.g3', '1775507933.g3', '1775503665.g3', '1775508542.g3', '1775506162.g3', '1775500642.g3', '1775506693.g3', '1775502597.g3', '1775508468.g3', '1775507221.g3', '1775506868.g3', '1775507046.g3', '1775501707.g3', '1775502242.g3', '1775509287.g3', '1775503310.g3', '1775506517.g3', '1775502954.g3', '1775508816.g3', '1775504198.g3', '1775501530.g3', '1775500819.g3', '1775504021.g3', '1775502775.g3', '1775508288.g3', '1775502065.g3', '1775502422.g3', '1775504912.g3', '1775504734.g3', '1775501351.g3', '1775503842.g3', '1775505863.g3', '1775501176.g3', '1775505984.g3', '1775504555.g3']
>>> for f in files:
...     scanner.process_file(f)
...
>>> arc = scanner.finalize()
>>> arc.simple('temperature')
(array([], dtype=float64), array([], dtype=float64))
>>> arc.simple('temperatures')
(array([], dtype=float64), array([], dtype=float64))
>>> arc.simple('channel_00')
(array([1.77550005e+09, 1.77550005e+09, 1.77550005e+09, ...,
       1.77550928e+09, 1.77550928e+09, 1.77550929e+09]), array([0.13146559, 0.12866655, 0.13144499, ..., 0.13128748, 0.1345487 ,
       0.13324801]))
>>> arc.simple('channel_01')
(array([1.77550005e+09, 1.77550005e+09, 1.77550005e+09, ...,
       1.77550928e+09, 1.77550928e+09, 1.77550929e+09]), array([0.37091482, 0.36679523, 0.36781594, ..., 0.08374199, 0.0795275 ,
       0.08244495]))
>>> arc.simple('influxdb_tags')
(array([], dtype=float64), array([], dtype=float64))

So I think the old structure remains in place within .g3.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Checklist:

  • My code follows the code style of this project.
  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.

@BrianJKoopman
Copy link
Copy Markdown
Member Author

BrianJKoopman commented May 1, 2026

In order to make the comparison easier I generated some 2 minute .g3 files with and without tagging: influxdb-tag-vs-no-tag-g3-files.tar.gz

Can someone take a look at these for differences? Maybe @kmharrington or @mhasself?

EDIT: These just have a fake data agent (w/2 channels), the registry, influxdb v1 publisher agent, and the aggregator agent running.

Copy link
Copy Markdown
Member

@mhasself mhasself left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good -- some comments, but I think they are minor (despite how many paragraphs were spent).

I agree that this should have no impact on the G3 data.

Comment thread ocs/ocs_feed.py
@staticmethod
def verify_influxdb_tags(message):
"""Check the 'influxdb_tags' to make sure all the needed information is
provided.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note it's acceptable for influxdb_tags to be entirely empty.

Comment thread ocs/ocs_feed.py
error_msg = f"'_field' not supplied with 'influxdb_tags' for tag set {v}"
raise ValueError(error_msg)

# check that all fields have a corresponding tag
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These checks will not fail if influxdb_tags contains a key that is not also in data. Should that be acceptable?

Comment thread ocs/ocs_feed.py
raise Exception("Block structure does not match: {}".format(self.name))

self.timestamps.extend(block['timestamps'])
self.tags = block.get('influxdb_tags')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be required that the latest tags match the previous tags?

I think, as written, someone could do this:

  • publish some data with tags={...} fully populated -- these get cached.
  • publish another point, with tags=None -- this gets cached.
  • If the buffer is now flushed, the result will have tags=None, because that was the most recent thing.

That would pass all checks, but the stored tags (when the block gets flushed) would be None. It's probably not what the user wanted. I think either:

  • Require user to pass the same tags every time they push data to this block OR
  • If tags=None, don't change the stored tags in the block.

At some level, data producers are doomed to always provide tags, so the the first thing makes more sense to me.

Granted, this would require modification to the aggregator agent (so far unchanged), because you'd need to initialize each new Block (in Provider.save_to_block) with influxdb_tags set from the first transmission. That is messier in the present instance but seems logically safer on the whole. I think the best way would be to have a constructor for Block that parses the block data dict (just as Block.append and Block.extend do), and sets all the things from that (including influxdb_tags). This would make Aggregator, in the long run, less fragile vis a vis stuff in the feed that it doesn't care about.

Base automatically changed from koopman/influxdb-format-data-refactor to main May 5, 2026 23:45
@BrianJKoopman
Copy link
Copy Markdown
Member Author

Thanks for the review, I'll address your comments soon.

A quick note to myself, I shouldn't have squashed #466. Before merging this I should rebase on main and drop the first three commits.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants