Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions homeassistant/components/generic_thermostat/climate.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from functools import partial
import logging
import math
import time
from typing import Any

import voluptuous as vol
Expand Down Expand Up @@ -51,6 +52,7 @@
)
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.device import async_entity_id_to_device
from homeassistant.helpers.entity import CONTEXT_RECENT_TIME_SECONDS
from homeassistant.helpers.entity_platform import (
AddConfigEntryEntitiesCallback,
AddEntitiesCallback,
Expand Down Expand Up @@ -478,6 +480,7 @@ async def _async_sensor_changed(self, event: Event[EventStateChangedData]) -> No
if new_state is None or new_state.state in (STATE_UNAVAILABLE, STATE_UNKNOWN):
return

self.async_set_context(event.context)
self._async_update_temp(new_state)
await self._async_control_heating()
self.async_write_ha_state()
Expand Down Expand Up @@ -531,9 +534,11 @@ def _async_update_temp(self, state: State) -> None:
_LOGGER.error("Unable to update from sensor: %s", ex)

async def _async_control_heating(
self, time: datetime | None = None, force: bool = False
self, _time: datetime | None = None, force: bool = False
) -> None:
"""Check if we need to turn heating on or off."""
called_by_timer = _time is not None

async with self._temp_lock:
if not self._active and None not in (
self._cur_temp,
Expand All @@ -552,7 +557,7 @@ async def _async_control_heating(
if not self._active or self._hvac_mode == HVACMode.OFF:
return

if force and time is not None and self.max_cycle_duration:
if force and called_by_timer and self.max_cycle_duration:
# We were invoked due to `max_cycle_duration`, so turn off
_LOGGER.debug(
"Turning off heater %s due to max cycle time of %s",
Expand Down Expand Up @@ -587,7 +592,7 @@ async def _async_control_heating(
now - self._last_toggled_time + self.min_cycle_duration,
self._async_timer_control_heating,
)
elif time is not None:
elif called_by_timer:
# This is a keep-alive call, so ensure it's on
_LOGGER.debug(
"Keep-alive - Turning on heater %s",
Expand All @@ -609,7 +614,7 @@ async def _async_control_heating(
now - self._last_toggled_time + self.cycle_cooldown,
self._async_timer_control_heating,
)
elif time is not None:
elif called_by_timer:
# This is a keep-alive call, so ensure it's off
_LOGGER.debug(
"Keep-alive - Turning off heater %s", self.heater_entity_id
Expand All @@ -624,13 +629,25 @@ def _is_device_active(self) -> bool | None:

return self.hass.states.is_state(self.heater_entity_id, STATE_ON)

def _get_current_context(self) -> Context | None:
"""Return the current context if it is still recent, or None."""
if (
self._context_set is not None
and time.time() - self._context_set > CONTEXT_RECENT_TIME_SECONDS
):
self._context = None
self._context_set = None
return self._context

async def _async_heater_turn_on(self, keepalive: bool = False) -> None:
"""Turn heater toggleable device on."""
data = {ATTR_ENTITY_ID: self.heater_entity_id}
# Create a new context for this service call so we can identify
# the resulting state change event as originating from us
new_context = Context(parent_id=self._context.id if self._context else None)
self.async_set_context(new_context)
# Create a child context for the switch service call so we can
# identify the resulting state change event as originating from us.
# Don't set it as our own context — the climate entity's state changes
# should remain attributed to the parent context (e.g., set_hvac_mode).
current_context = self._get_current_context()
new_context = Context(parent_id=current_context.id if current_context else None)
self._last_context_id = new_context.id
await self.hass.services.async_call(
HOMEASSISTANT_DOMAIN, SERVICE_TURN_ON, data, context=new_context
Expand All @@ -654,10 +671,12 @@ async def _async_heater_turn_on(self, keepalive: bool = False) -> None:
async def _async_heater_turn_off(self, keepalive: bool = False) -> None:
"""Turn heater toggleable device off."""
data = {ATTR_ENTITY_ID: self.heater_entity_id}
# Create a new context for this service call so we can identify
# the resulting state change event as originating from us
new_context = Context(parent_id=self._context.id if self._context else None)
self.async_set_context(new_context)
# Create a child context for the switch service call so we can
# identify the resulting state change event as originating from us.
# Don't set it as our own context — the climate entity's state changes
# should remain attributed to the parent context (e.g., set_hvac_mode).
current_context = self._get_current_context()
new_context = Context(parent_id=current_context.id if current_context else None)
self._last_context_id = new_context.id
await self.hass.services.async_call(
HOMEASSISTANT_DOMAIN, SERVICE_TURN_OFF, data, context=new_context
Expand Down
29 changes: 23 additions & 6 deletions homeassistant/components/logbook/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
ATTR_DEVICE_ID,
ATTR_DOMAIN,
ATTR_ENTITY_ID,
ATTR_SERVICE_DATA,
ATTR_UNIT_OF_MEASUREMENT,
EVENT_CALL_SERVICE,
EVENT_LOGBOOK_ENTRY,
EVENT_STATE_CHANGED,
)
Expand Down Expand Up @@ -104,10 +106,22 @@ def async_determine_event_types(


@callback
def extract_attr(source: Mapping[str, Any], attr: str) -> list[str]:
"""Extract an attribute as a list or string."""
def extract_attr(
event_type: EventType[Any] | str, source: Mapping[str, Any], attr: str
) -> list[str]:
"""Extract an attribute as a list or string.

For EVENT_CALL_SERVICE events, the entity_id is inside service_data,
not at the top level. Check service_data as a fallback.
"""
if (value := source.get(attr)) is None:
return []
# Early return to avoid unnecessary dict lookups for non-service events
if event_type != EVENT_CALL_SERVICE:
return []
if service_data := source.get(ATTR_SERVICE_DATA):
value = service_data.get(attr)
if value is None:
return []
if isinstance(value, list):
return value
return str(value).split(",")
Expand Down Expand Up @@ -135,7 +149,7 @@ def event_forwarder_filtered(
def _forward_events_filtered_by_entities_filter(event: Event) -> None:
assert entities_filter is not None
event_data = event.data
entity_ids = extract_attr(event_data, ATTR_ENTITY_ID)
entity_ids = extract_attr(event.event_type, event_data, ATTR_ENTITY_ID)
if entity_ids and not any(
entities_filter(entity_id) for entity_id in entity_ids
):
Expand All @@ -157,9 +171,12 @@ def _forward_events_filtered_by_entities_filter(event: Event) -> None:
@callback
def _forward_events_filtered_by_device_entity_ids(event: Event) -> None:
event_data = event.data
event_type = event.event_type
if entity_ids_set.intersection(
extract_attr(event_data, ATTR_ENTITY_ID)
) or device_ids_set.intersection(extract_attr(event_data, ATTR_DEVICE_ID)):
extract_attr(event_type, event_data, ATTR_ENTITY_ID)
) or device_ids_set.intersection(
extract_attr(event_type, event_data, ATTR_DEVICE_ID)
):
target(event)

return _forward_events_filtered_by_device_entity_ids
Expand Down
5 changes: 4 additions & 1 deletion homeassistant/components/logbook/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,10 @@ def async_event_to_row(event: Event) -> EventAsRow:
# that are missing new_state or old_state
# since the logbook does not show these
new_state: State = event.data["new_state"]
context = new_state.context
# Use the event's context rather than the state's context because
# State.expire() replaces the context with a copy that loses
# origin_event, which is needed for context augmentation.
context = event.context
return EventAsRow(
row_id=hash(event),
event_type=None,
Expand Down
121 changes: 115 additions & 6 deletions homeassistant/components/logbook/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
from __future__ import annotations

from collections.abc import Callable, Generator, Sequence
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime as dt
import logging
import time
from typing import TYPE_CHECKING, Any

from lru import LRU
from sqlalchemy.engine import Result
from sqlalchemy.engine.row import Row
from sqlalchemy.orm import Session

from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.filters import Filters
Expand All @@ -37,6 +39,7 @@
from homeassistant.core import HomeAssistant, split_entity_id
from homeassistant.helpers import entity_registry as er
from homeassistant.util import dt as dt_util
from homeassistant.util.collection import chunked_or_all
from homeassistant.util.event_type import EventType

from .const import (
Expand Down Expand Up @@ -80,10 +83,18 @@
async_event_to_row,
)
from .queries import statement_for_request
from .queries.common import PSEUDO_EVENT_STATE_CHANGED
from .queries.common import (
PSEUDO_EVENT_STATE_CHANGED,
select_context_user_ids_for_context_ids,
)

_LOGGER = logging.getLogger(__name__)

# Bound for the parent-context user-id cache — only needs to bridge the
# historical→live handoff, so the in-flight set is realistically ~tens with
# peak bursts of ~100. Ceiling bounds memory in pathological cases.
MAX_CONTEXT_USER_IDS_CACHE = 256


@dataclass(slots=True)
class LogbookRun:
Expand All @@ -99,6 +110,14 @@ class LogbookRun:
include_entity_name: bool
timestamp: bool
memoize_new_contexts: bool = True
# True when this run will switch to a live stream; gates population of
# context_user_ids (wasted work for one-shot REST/get_events callers).
for_live_stream: bool = False
# context_id -> user_id for parent context attribution; persisted across
# batches so child rows can inherit user_id from a parent seen earlier.
context_user_ids: LRU[bytes, bytes] = field(
default_factory=lambda: LRU(MAX_CONTEXT_USER_IDS_CACHE)
)


class EventProcessor:
Expand All @@ -113,6 +132,7 @@ def __init__(
context_id: str | None = None,
timestamp: bool = False,
include_entity_name: bool = True,
for_live_stream: bool = False,
) -> None:
"""Init the event stream."""
assert not (context_id and (entity_ids or device_ids)), (
Expand All @@ -133,6 +153,7 @@ def __init__(
entity_name_cache=EntityNameCache(self.hass),
include_entity_name=include_entity_name,
timestamp=timestamp,
for_live_stream=for_live_stream,
)
self.context_augmenter = ContextAugmenter(self.logbook_run)

Expand Down Expand Up @@ -180,13 +201,67 @@ def get_events(
self.filters,
self.context_id,
)
return self.humanify(
execute_stmt_lambda_element(session, stmt, orm_rows=False)
rows = execute_stmt_lambda_element(session, stmt, orm_rows=False)
query_parent_user_ids: dict[bytes, bytes] | None = None
if self.entity_ids or self.device_ids:
# Filtered queries exclude parent call_service rows for
# unrelated targets, so child contexts lose user attribution
# without a pre-pass. all_stmt already includes them.
rows = list(rows)
query_parent_user_ids = self._fetch_parent_user_ids(
session, rows, instance.max_bind_vars
)
return self.humanify(rows, query_parent_user_ids)

def _fetch_parent_user_ids(
self,
session: Session,
rows: list[Row],
max_bind_vars: int,
) -> dict[bytes, bytes] | None:
"""Resolve parent-context user_ids for rows in a filtered query.

Done in Python rather than as a SQL union branch because the
context_parent_id_bin column is sparsely populated — scanning the
States table for non-null parents costs ~40% of the overall query
on real datasets. Here we collect only the parent ids we actually
need and fetch them via an indexed point-lookup on context_id_bin.
"""
cache = self.logbook_run.context_user_ids
pending: set[bytes] = {
parent_id
for row in rows
if (parent_id := row[CONTEXT_PARENT_ID_BIN_POS]) and parent_id not in cache
}
if not pending:
return None
query_parent_user_ids: dict[bytes, bytes] = {}
# The lambda statement unions events and states, so each id appears
# in two IN clauses — halve the chunk size to stay under the
# database's max bind variable count.
for pending_chunk in chunked_or_all(pending, max_bind_vars // 2):
# Schema allows NULL but the query's WHERE clauses exclude it;
# explicit checks satisfy the type checker.
query_parent_user_ids.update(
{
parent_id: user_id
for parent_id, user_id in execute_stmt_lambda_element(
session,
select_context_user_ids_for_context_ids(pending_chunk),
orm_rows=False,
)
if parent_id is not None and user_id is not None
}
)
if self.logbook_run.for_live_stream:
cache.update(query_parent_user_ids)
return query_parent_user_ids

def humanify(
self, rows: Generator[EventAsRow] | Sequence[Row] | Result
) -> list[dict[str, str]]:
self,
rows: Generator[EventAsRow] | Sequence[Row] | Result,
query_parent_user_ids: dict[bytes, bytes] | None = None,
) -> list[dict[str, Any]]:
"""Humanify rows."""
return list(
_humanify(
Expand All @@ -195,6 +270,7 @@ def humanify(
self.ent_reg,
self.logbook_run,
self.context_augmenter,
query_parent_user_ids,
)
)

Expand All @@ -205,6 +281,7 @@ def _humanify(
ent_reg: er.EntityRegistry,
logbook_run: LogbookRun,
context_augmenter: ContextAugmenter,
query_parent_user_ids: dict[bytes, bytes] | None,
) -> Generator[dict[str, Any]]:
"""Generate a converted list of events into entries."""
# Continuous sensors, will be excluded from the logbook
Expand All @@ -220,11 +297,21 @@ def _humanify(
context_id_bin: bytes
data: dict[str, Any]

context_user_ids = logbook_run.context_user_ids
# Skip the LRU write on one-shot runs — the LogbookRun is discarded.
populate_context_user_ids = logbook_run.for_live_stream

# Process rows
for row in rows:
context_id_bin = row[CONTEXT_ID_BIN_POS]
if memoize_new_contexts and context_id_bin not in context_lookup:
context_lookup[context_id_bin] = row
if (
populate_context_user_ids
and (context_user_id_bin := row[CONTEXT_USER_ID_BIN_POS])
and context_id_bin not in context_user_ids
):
context_user_ids[context_id_bin] = context_user_id_bin
if row[CONTEXT_ONLY_POS]:
continue
event_type = row[EVENT_TYPE_POS]
Expand Down Expand Up @@ -311,6 +398,28 @@ def _humanify(
):
context_augmenter.augment(data, context_row)

# Fall back to the parent context for child contexts that inherit
# user attribution (e.g., generic_thermostat -> switch turn_on).
# Read from context_lookup directly instead of get_context() to
# avoid the origin_event fallback which would return the *child*
# row's origin event, not the parent's.
if CONTEXT_USER_ID not in data and (
context_parent_id_bin := row[CONTEXT_PARENT_ID_BIN_POS]
):
parent_user_id_bin: bytes | None = context_user_ids.get(
context_parent_id_bin
)
if parent_user_id_bin is None and query_parent_user_ids is not None:
parent_user_id_bin = query_parent_user_ids.get(context_parent_id_bin)
if (
parent_user_id_bin is None
and (parent_row := context_lookup.get(context_parent_id_bin))
is not None
):
parent_user_id_bin = parent_row[CONTEXT_USER_ID_BIN_POS]
if parent_user_id_bin:
data[CONTEXT_USER_ID] = bytes_to_uuid_hex_or_none(parent_user_id_bin)

yield data


Expand Down
Loading
Loading