Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test-warehouse.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,9 @@ jobs:
# using ".[vertica]" would re-resolve dbt-vertica's deps and downgrade
# dbt-core to ~=1.8. Install elementary without the adapter extra.
if [ "${{ inputs.warehouse-type }}" = "vertica" ]; then
pip install "."
pip install ".[s3,gcs,azure,slack]"
else
pip install ".[${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || inputs.warehouse-type }}]"
pip install ".[${{ (inputs.warehouse-type == 'databricks_catalog' && 'databricks') || inputs.warehouse-type }},s3,gcs,azure,slack]"
fi

- name: Write dbt profiles
Expand Down
14 changes: 8 additions & 6 deletions elementary/clients/slack/slack_message_builder.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from enum import Enum
from typing import List, Optional, Union

from slack_sdk.models.blocks import HeaderBlock, SectionBlock

from elementary.clients.slack.schema import SlackBlocksType, SlackMessageSchema
from elementary.utils.json_utils import unpack_and_flatten_str_to_list
from elementary.utils.pydantic_shim import BaseModel

# Slack Block Kit limits (avoid module-level slack_sdk import).
_HEADER_TEXT_MAX_LENGTH = 150
_SECTION_TEXT_MAX_LENGTH = 3000


class OptionSchema(BaseModel):
value: str
Expand Down Expand Up @@ -56,11 +58,11 @@ def _add_blocks_as_attachments(self, blocks: SlackBlocksType):

@staticmethod
def get_limited_markdown_msg(section_msg: str) -> str:
if len(section_msg) < SectionBlock.text_max_length:
if len(section_msg) < _SECTION_TEXT_MAX_LENGTH:
return section_msg
return (
section_msg[
: SectionBlock.text_max_length
: _SECTION_TEXT_MAX_LENGTH
- len(SlackMessageBuilder._CONTINUATION_SYMBOL)
- SlackMessageBuilder._LONGEST_MARKDOWN_SUFFIX_LEN
]
Expand Down Expand Up @@ -120,8 +122,8 @@ def create_context_block(context_msgs: list) -> dict:

@staticmethod
def create_header_block(msg: str) -> dict:
if len(msg) > HeaderBlock.text_max_length:
final_msg = msg[: HeaderBlock.text_max_length - 3] + "..."
if len(msg) > _HEADER_TEXT_MAX_LENGTH:
final_msg = msg[: _HEADER_TEXT_MAX_LENGTH - 3] + "..."
else:
final_msg = msg

Expand Down
8 changes: 5 additions & 3 deletions elementary/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
from pathlib import Path
from typing import Optional

import google.auth # type: ignore[import]
from dateutil import tz
from google.auth.exceptions import DefaultCredentialsError # type: ignore[import]

from elementary.exceptions.exceptions import InvalidArgumentsError
from elementary.monitor.alerts.grouping_type import GroupingType
Expand Down Expand Up @@ -265,9 +263,13 @@ def has_gcloud(self):
if self.google_service_account_path:
return True
try:
import google.auth # type: ignore[import]

google.auth.default()
return True
except DefaultCredentialsError:
except ImportError:
return False
except google.auth.exceptions.DefaultCredentialsError:
return False

@property
Expand Down
13 changes: 8 additions & 5 deletions elementary/messages/formats/block_kit.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
from typing import Any, Callable, List, Optional, Tuple

from slack_sdk.models import blocks as slack_blocks
from tabulate import tabulate

from elementary.messages.blocks import (
Expand Down Expand Up @@ -105,11 +104,15 @@ def _format_table_cell(self, cell_value: Any, column_count: int) -> str:
return value[: max_cell_length - 2] + ".."
return value

# Slack Block Kit limits (avoid module-level slack_sdk import).
_SECTION_TEXT_MAX_LENGTH = 3000
_HEADER_TEXT_MAX_LENGTH = 150

def _format_markdown_section_text(self, text: str) -> dict:
if len(text) > slack_blocks.SectionBlock.text_max_length:
if len(text) > self._SECTION_TEXT_MAX_LENGTH:
text = (
text[
: slack_blocks.SectionBlock.text_max_length
: self._SECTION_TEXT_MAX_LENGTH
- len("...")
- self._LONGEST_MARKDOWN_SUFFIX_LEN
]
Expand Down Expand Up @@ -198,8 +201,8 @@ def _add_lines_block(self, block: LinesBlock) -> None:
self._add_block(self._format_markdown_section("\n".join(formatted_lines)))

def _add_header_block(self, block: HeaderBlock) -> None:
if len(block.text) > slack_blocks.HeaderBlock.text_max_length:
text = block.text[: slack_blocks.HeaderBlock.text_max_length - 3] + "..."
if len(block.text) > self._HEADER_TEXT_MAX_LENGTH:
text = block.text[: self._HEADER_TEXT_MAX_LENGTH - 3] + "..."
else:
text = block.text
self._add_block(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,9 @@
BaseMessagingIntegration,
DestinationType,
)
from elementary.messages.messaging_integrations.slack_web import (
SlackWebMessagingIntegration,
)
from elementary.messages.messaging_integrations.slack_webhook import (
SlackWebhookMessagingIntegration,
)
from elementary.messages.messaging_integrations.teams_webhook import (
TeamsWebhookMessagingIntegration,
)
from elementary.monitor.data_monitoring.alerts.integrations.base_integration import (
BaseIntegration,
)
from elementary.monitor.data_monitoring.alerts.integrations.slack.slack import (
SlackIntegration,
)
from elementary.tracking.tracking_interface import Tracking
from elementary.utils.log import get_logger
from elementary.utils.ssl import create_ssl_context
Expand All @@ -44,6 +32,16 @@ def get_integration(
tracking: Optional[Tracking] = None,
) -> Union[BaseMessagingIntegration, BaseIntegration]:
if config.has_slack:
from elementary.messages.messaging_integrations.slack_web import (
SlackWebMessagingIntegration,
)
from elementary.messages.messaging_integrations.slack_webhook import (
SlackWebhookMessagingIntegration,
)
from elementary.monitor.data_monitoring.alerts.integrations.slack.slack import (
SlackIntegration,
)

ssl_context = create_ssl_context(config.ssl_ca_bundle)
if config.is_slack_workflow:
return SlackIntegration(
Expand All @@ -61,6 +59,10 @@ def get_integration(
else:
raise UnsupportedAlertIntegrationError
elif config.has_teams:
from elementary.messages.messaging_integrations.teams_webhook import (
TeamsWebhookMessagingIntegration,
)

return TeamsWebhookMessagingIntegration(config.teams_webhook)
else:
raise UnsupportedAlertIntegrationError
Expand All @@ -72,6 +74,16 @@ def get_destination(
metadata: dict,
override_config_defaults: bool = False,
) -> DestinationType:
from elementary.messages.messaging_integrations.slack_web import (
SlackWebMessagingIntegration,
)
from elementary.messages.messaging_integrations.slack_webhook import (
SlackWebhookMessagingIntegration,
)
from elementary.messages.messaging_integrations.teams_webhook import (
TeamsWebhookMessagingIntegration,
)

if (
isinstance(integration, TeamsWebhookMessagingIntegration)
and config.has_teams
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Sequence, Union

from slack_sdk.models.blocks import SectionBlock

from elementary.clients.slack.client import SlackClient, SlackWebClient
from elementary.clients.slack.schema import SlackBlocksType, SlackMessageSchema
from elementary.clients.slack.slack_message_builder import MessageColor
from elementary.clients.slack.slack_message_builder import (
_SECTION_TEXT_MAX_LENGTH,
MessageColor,
)
from elementary.config.config import Config
from elementary.monitor.alerts.alerts_groups import AlertsGroup, GroupedByTableAlerts
from elementary.monitor.alerts.alerts_groups.base_alerts_group import BaseAlertsGroup
Expand Down Expand Up @@ -238,7 +239,7 @@ def _get_dbt_test_template(
result.append(self.message_builder.create_context_block(["*Test query*"]))

msg = f"```{alert.test_results_query}```"
if len(msg) > SectionBlock.text_max_length:
if len(msg) > _SECTION_TEXT_MAX_LENGTH:
msg = (
f"_The test query was too long, here's a query to get it._\n"
f"```SELECT test_results_query FROM {alert.elementary_database_and_schema}.elementary_test_results WHERE test_execution_id = '{alert.id}'```"
Expand Down
81 changes: 63 additions & 18 deletions elementary/monitor/data_monitoring/report/data_monitoring_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,12 @@
import webbrowser
from typing import Optional, Tuple

from elementary.clients.azure.client import AzureClient
from elementary.clients.gcs.client import GCSClient
from elementary.clients.s3.client import S3Client
from elementary.clients.slack.client import SlackClient
from elementary.config.config import Config
from elementary.monitor.api.invocations.invocations import InvocationsAPI
from elementary.monitor.api.report.report import ReportAPI
from elementary.monitor.api.report.schema import ReportDataSchema
from elementary.monitor.api.tests.tests import TestsAPI
from elementary.monitor.data_monitoring.data_monitoring import DataMonitoring
from elementary.monitor.data_monitoring.report.slack_report_summary_message_builder import (
SlackReportSummaryMessageBuilder,
)
from elementary.monitor.data_monitoring.schema import FiltersSchema
from elementary.tracking.anonymous_tracking import AnonymousTracking
from elementary.tracking.tracking_interface import Tracking
Expand All @@ -43,14 +36,62 @@ def __init__(
config, tracking, force_update_dbt_package, disable_samples, selector_filter
)
self.report_api = ReportAPI(self.internal_dbt_runner)
self.s3_client = S3Client.create_client(self.config, tracking=self.tracking)
self.gcs_client = GCSClient.create_client(self.config, tracking=self.tracking)
self.azure_client = AzureClient.create_client(
self.config, tracking=self.tracking
)
self.slack_client = SlackClient.create_client(
self.config, tracking=self.tracking
)

self.s3_client = None
if self.config.has_s3:
try:
from elementary.clients.s3.client import S3Client

self.s3_client = S3Client.create_client(
self.config, tracking=self.tracking
)
except ImportError:
logger.warning(
"S3 dependencies are not installed. "
"Install them with: pip install 'elementary-data[s3]'"
)

self.gcs_client = None
if self.config.gcs_bucket_name:
try:
from elementary.clients.gcs.client import GCSClient

self.gcs_client = GCSClient.create_client(
self.config, tracking=self.tracking
)
except ImportError:
logger.warning(
"GCS dependencies are not installed. "
"Install them with: pip install 'elementary-data[gcs]'"
)

self.azure_client = None
if self.config.has_blob:
try:
from elementary.clients.azure.client import AzureClient

self.azure_client = AzureClient.create_client(
self.config, tracking=self.tracking
)
except ImportError:
logger.warning(
"Azure dependencies are not installed. "
"Install them with: pip install 'elementary-data[azure]'"
)

self.slack_client = None
if self.config.has_slack:
try:
from elementary.clients.slack.client import SlackClient

self.slack_client = SlackClient.create_client(
self.config, tracking=self.tracking
)
except ImportError:
logger.warning(
"Slack dependencies are not installed. "
"Install them with: pip install 'elementary-data[slack]'"
)

def generate_report(
self,
Expand Down Expand Up @@ -165,9 +206,9 @@ def _add_report_tracking(
report_data.tracking = dict(
posthog_api_key=self.tracking.POSTHOG_PROJECT_API_KEY,
report_generator_anonymous_user_id=self.tracking.anonymous_user_id,
anonymous_warehouse_id=self.warehouse_info.id
if self.warehouse_info
else None,
anonymous_warehouse_id=(
self.warehouse_info.id if self.warehouse_info else None
),
)

def send_report(
Expand Down Expand Up @@ -298,6 +339,10 @@ def send_test_results_summary(
dbt_invocation=invocation,
)
if self.slack_client:
from elementary.monitor.data_monitoring.report.slack_report_summary_message_builder import (
SlackReportSummaryMessageBuilder,
)

send_succeeded = self.slack_client.send_message(
channel_name=self.config.slack_channel_name,
message=SlackReportSummaryMessageBuilder().get_slack_message(
Expand Down
24 changes: 19 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,25 @@ requests = ">=2.28.1,<3.0.0"
beautifulsoup4 = "<5.0.0"
ratelimit = "*"
posthog = "<3.0.0"
boto3 = "<2.0.0"
google-cloud-storage = ">=2.4,<3.2"
"ruamel.yaml" = "<1.0.0"
alive-progress = "<=2.3.1"
slack-sdk = ">=3.20.1,<4.0.0"

pydantic = "<3.0"
networkx = ">=2.3,<3"
packaging = ">=20.9"
azure-storage-blob = ">=12.11.0"
pymsteams = ">=0.2.2,<1.0.0"
tabulate = ">= 0.9.0"
tenacity = ">=8.0,<10.0"
pytz = ">= 2025.1"

# Cloud storage and notification integrations.
# These will move to optional extras in a future release (Phase 2).
# For now they remain required so existing installs are not broken.
boto3 = "<2.0.0"
google-cloud-storage = ">=2.4,<3.2"
slack-sdk = ">=3.20.1,<4.0.0"
azure-storage-blob = ">=12.11.0"
pymsteams = ">=0.2.2,<1.0.0"

dbt-snowflake = {version = ">=1.8,<2.0.0", optional = true}
dbt-bigquery = {version = ">=1.8,<2.0.0", optional = true}
dbt-redshift = {version = ">=1.8,<2.0.0", optional = true}
Expand All @@ -60,6 +64,7 @@ dbt-fabric = {version = ">=1.8,<2.0.0", optional = true}
dbt-fabricspark = {version = ">=1.8,<2.0.0", optional = true}
dbt-sqlserver = {version = ">=1.8,<2.0.0", optional = true}
dbt-vertica = {version = ">=1.8,<2.0.0", optional = true}

[tool.poetry.extras]
snowflake = ["dbt-snowflake"]
bigquery = ["dbt-bigquery"]
Expand All @@ -76,6 +81,15 @@ fabric = ["dbt-fabric"]
fabricspark = ["dbt-fabricspark"]
sqlserver = ["dbt-sqlserver"]
vertica = ["dbt-vertica"]

# Cloud storage and notification extras (Phase 1: empty, deps are still required).
# In Phase 2 these will reference the actual optional dependencies.
s3 = []
gcs = []
azure = []
slack = []
msteams = []

# dbt-fabricspark is excluded due to broken upstream dependencies (azure-cli pre-release pins).
# dbt-vertica is excluded because it pins dbt-core==1.8.5, forcing the entire resolution to dbt 1.8.
# Both are still available as individual extras (e.g. pip install elementary-data[vertica]).
Expand Down
Loading