Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion config/locales/en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ en:
airbyte_stream_config_path:
description: |
Path of airbyte stream config file relative to the App root (Rails.root)
default: /terraform/aks/workspace-variables/airbyte_streams.json
default: /terraform/aks/workspace-variables/airbyte_stream_config.json
airbyte_internal_dataset:
description: |
Airbyte internal dataset for service
default: ENV['AIRBYTE_INTERNAL_DATASET']
Comment thread
asatwal marked this conversation as resolved.
airbyte_client_id:
description: |
Airbyte Client Id for retrieving access token
Expand Down
17 changes: 14 additions & 3 deletions docs/airbyte.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,21 @@ Main configuration options for Airbyte:
- Following a schema migration the airbyte connection config can be regenerated by the rails rake task:<br>
`rake dfe:analytics:airbyte_connection_refresh`

- Following a DevOps deployment all required airbyte deployment tasks (see below) can be executed by the rails rake task:<br>
`rake dfe:analytics:airbyte_deploy_tasks`

- The tasks required following a DevOps deployment are summarised below:
- Wait for any rails database migrations
- Refresh the airbyte connection to update airbyte with any schema changes
- Retrieve status of the last airbyte sync job
- If the status of the last airbyte sync job is not 'running' then start a new sync
- Wait for the sync job to finish - note that the wait time is currently up to 1 hour to cater for large databases
- Apply hidden policy tags to all PII fields in the final airbyte tables

- DfE Analytics will still be used to stream the following event types to BigQuery:
- Web request events
- API request events
- Custom events
- Web request events
- API request events
- Custom events

## Airbyte Configuration

Expand Down
5 changes: 2 additions & 3 deletions lib/dfe/analytics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require 'httparty'
require 'google/cloud/bigquery'
require 'dfe/analytics/activerecord' if defined?(ActiveRecord)
require 'dfe/analytics/jobs'
require 'dfe/analytics/config'
require 'dfe/analytics/event_schema'
require 'dfe/analytics/fields'
Expand All @@ -13,12 +14,10 @@
require 'dfe/analytics/concerns/requestable'
require 'dfe/analytics/event'
require 'dfe/analytics/event_matcher'
require 'dfe/analytics/analytics_job'
require 'dfe/analytics/send_events'
require 'dfe/analytics/load_entities'
require 'dfe/analytics/load_entity_batch'
require 'dfe/analytics/requests'
require 'dfe/analytics/entity_table_check_job'
require 'dfe/analytics/initialisation_events'
require 'dfe/analytics/version'
require 'dfe/analytics/middleware/request_identity'
Expand All @@ -29,7 +28,7 @@
require 'dfe/analytics/azure_federated_auth'
require 'dfe/analytics/api_requests'
require 'dfe/analytics/airbyte_stream_config'
require 'dfe/analytics/big_query_apply_policy_tags'
require 'dfe/analytics/services'
require 'services/airbyte'

module DfE
Expand Down
1 change: 1 addition & 0 deletions lib/dfe/analytics/activerecord.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require 'dfe/analytics/shared/service_pattern'
require 'dfe/analytics/services/entity_table_checks'
require 'dfe/analytics/services/checksum_calculator'
require 'dfe/analytics/services/generic_checksum_calculator'
Expand Down
10 changes: 0 additions & 10 deletions lib/dfe/analytics/analytics_job.rb

This file was deleted.

6 changes: 3 additions & 3 deletions lib/dfe/analytics/big_query_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ def self.error_message_for(response)
"DfE::Analytics BigQuery API insert error for #{response.insert_errors.length} event(s):\n#{message}"
end

def self.apply_policy_tags(tables, policy_tag)
def self.apply_policy_tags(dataset, tables, policy_tag)
tables.each do |table_name, column_names|
begin
table = client.get_table(
DfE::Analytics.config.bigquery_project_id,
DfE::Analytics.config.bigquery_airbyte_dataset,
dataset,
table_name.to_s
)
rescue Google::Apis::ClientError => e
Expand All @@ -106,7 +106,7 @@ def self.apply_policy_tags(tables, policy_tag)
begin
client.patch_table(
DfE::Analytics.config.bigquery_project_id,
DfE::Analytics.config.bigquery_airbyte_dataset,
dataset,
table_name.to_s,
updated_table,
fields: 'schema'
Expand Down
30 changes: 0 additions & 30 deletions lib/dfe/analytics/big_query_apply_policy_tags.rb

This file was deleted.

2 changes: 2 additions & 0 deletions lib/dfe/analytics/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ module Config
excluded_models_proc
database_events_enabled
airbyte_enabled
airbyte_internal_dataset
airbyte_stream_config_path
airbyte_client_id
airbyte_client_secret
Expand Down Expand Up @@ -69,6 +70,7 @@ def self.configure(config)
config.excluded_models_proc ||= proc { |_model| false }
config.database_events_enabled ||= true
config.airbyte_enabled ||= false
config.airbyte_internal_dataset ||= ENV.fetch('AIRBYTE_INTERNAL_DATASET', 'airbyte_internal')
config.airbyte_client_id ||= ENV.fetch('AIRBYTE_CLIENT_ID', nil)
config.airbyte_client_secret ||= ENV.fetch('AIRBYTE_CLIENT_SECRET', nil)
config.airbyte_server_url ||= ENV.fetch('AIRBYTE_SERVER_URL', nil)
Expand Down
19 changes: 0 additions & 19 deletions lib/dfe/analytics/entity_table_check_job.rb

This file was deleted.

6 changes: 6 additions & 0 deletions lib/dfe/analytics/jobs.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# frozen_string_literal: true

require 'dfe/analytics/jobs/analytics_job'
require 'dfe/analytics/jobs/airbyte_deploy_job'
require 'dfe/analytics/jobs/big_query_apply_policy_tags_job'
require 'dfe/analytics/jobs/entity_table_check_job'
51 changes: 51 additions & 0 deletions lib/dfe/analytics/jobs/airbyte_deploy_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# frozen_string_literal: true

module DfE
module Analytics
module Jobs
# Orchestration job for airbyte deployment
class AirbyteDeployJob < AnalyticsJob
queue_as :default
# No retries – discard on any StandardError
discard_on StandardError

def perform
# Wait for any pending migrations to finish
DfE::Analytics::Services::WaitForMigrations.call

Rails.logger.info('Finished WaitForMigrations')

access_token = ::Services::Airbyte::AccessToken.call
connection_id, source_id = ::Services::Airbyte::ConnectionList.call(access_token:)

# Refresh schema
::Services::Airbyte::ConnectionRefresh.call(access_token:, connection_id:, source_id:)

Rails.logger.info('Finished ConnectionRefresh')

# Check if a sync job is already running
last_job = ::Services::Airbyte::JobLast.call(access_token:, connection_id:)
status = last_job&.dig('job', 'status')
job_id = last_job&.dig('job', 'id')

Rails.logger.info("JobLast status: #{status} id: #{job_id}")

job_id = ::Services::Airbyte::StartSync.call(access_token:, connection_id:) if status != 'running'

# Wait for the job (existing or new) to finish
::Services::Airbyte::WaitForSync.call(access_token:, connection_id:, job_id:)

Rails.logger.info('Finished WaitForSync')

# Trigger policy tagging for final tables
DfE::Analytics::Services::ApplyAirbyteFinalTablesPolicyTags.call

Rails.logger.info('Finished AirbyteDeployJob')
rescue StandardError => e
Rails.logger.error(e.message)
raise "AirbyteDeployJob failed: #{e.message}"
end
end
end
end
end
13 changes: 13 additions & 0 deletions lib/dfe/analytics/jobs/analytics_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module DfE
module Analytics
module Jobs
# Base class for all DfE::Analytics jobs
class AnalyticsJob < ActiveJob::Base
queue_as { DfE::Analytics.config.queue }

wait_option = Rails::VERSION::STRING >= '7.1' ? :polynomially_longer : :exponentially_longer
retry_on StandardError, wait: wait_option, attempts: 5
end
end
end
end
29 changes: 29 additions & 0 deletions lib/dfe/analytics/jobs/big_query_apply_policy_tags_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

module DfE
module Analytics
module Jobs
# Applies BigQuery hidden policy tags to PII fields in the airbyte tables
class BigQueryApplyPolicyTagsJob < AnalyticsJob
def self.do(dataset:, tables:, policy_tag:, delay_in_minutes: 0)
if delay_in_minutes.zero?
perform_later(dataset, tables, policy_tag)
else
time_to_run = Time.zone.now + delay_in_minutes.minutes

set(wait_until: time_to_run).perform_later(dataset, tables, policy_tag)
end
end

def perform(dataset, tables, policy_tag)
unless DfE::Analytics.airbyte_enabled?
Rails.logger.warn('DfE::Analytics::BigQueryApplyPolicyTags.perform called but airbyte is disabled. Please check DfE::Analytics.airbyte_enabled? before applying policy tags in BigQuery')
return
end

DfE::Analytics::BigQueryApi.apply_policy_tags(dataset, tables, policy_tag)
end
end
end
end
end
21 changes: 21 additions & 0 deletions lib/dfe/analytics/jobs/entity_table_check_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

require 'active_support/values/time_zone'

module DfE
module Analytics
module Jobs
# To ensure BigQuery is in sync with the database
class EntityTableCheckJob < AnalyticsJob
def perform
return unless DfE::Analytics.enabled? && DfE::Analytics.entity_table_checks_enabled?

entity_tag = Time.now.strftime('%Y%m%d%H%M%S')
DfE::Analytics.entities_for_analytics.each do |entity_name|
DfE::Analytics::Services::EntityTableChecks.call(entity_name: entity_name, entity_type: 'entity_table_check', entity_tag: entity_tag)
end
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/dfe/analytics/load_entity_batch.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module DfE
module Analytics
class LoadEntityBatch < AnalyticsJob
class LoadEntityBatch < DfE::Analytics::Jobs::AnalyticsJob
# https://cloud.google.com/bigquery/quotas#streaming_inserts
# at a batch size of 500, this allows 20kb per record
BQ_BATCH_MAX_BYTES = 10_000_000
Expand Down
2 changes: 1 addition & 1 deletion lib/dfe/analytics/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Railtie < Rails::Railtie

initializer 'dfe.analytics.logger' do
ActiveSupport.on_load(:active_job) do
analytics_job = DfE::Analytics::AnalyticsJob
analytics_job = DfE::Analytics::Jobs::AnalyticsJob
# Rails < 6.1 doesn't support log_arguments = false so we only log warn
# to prevent wild log inflation
if analytics_job.respond_to?(:log_arguments=)
Expand Down
2 changes: 1 addition & 1 deletion lib/dfe/analytics/send_events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

module DfE
module Analytics
class SendEvents < AnalyticsJob
class SendEvents < DfE::Analytics::Jobs::AnalyticsJob
def self.do(events)
unless DfE::Analytics.enabled?
Rails.logger.warn('DfE::Analytics::SendEvents.do() called but DfE::Analytics is disabled. Please check DfE::Analytics.enabled? before sending events to BigQuery')
Expand Down
11 changes: 11 additions & 0 deletions lib/dfe/analytics/services.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

require 'dfe/analytics/shared/service_pattern'

require 'dfe/analytics/services/apply_airbyte_final_tables_policy_tags'
require 'dfe/analytics/services/apply_airbyte_internal_tables_policy_tags'
require 'dfe/analytics/services/checksum_calculator'
require 'dfe/analytics/services/entity_table_checks'
require 'dfe/analytics/services/generic_checksum_calculator'
require 'dfe/analytics/services/postgres_checksum_calculator'
require 'dfe/analytics/services/wait_for_migrations'
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# frozen_string_literal: true

module DfE
module Analytics
module Services
# Apply hidden policy tags to the final airbyte table columns in the hidden pii config
class ApplyAirbyteFinalTablesPolicyTags
include ServicePattern

def initialize(delay_in_minutes: 0)
@delay_in_minutes = delay_in_minutes
end

def call
DfE::Analytics::Jobs::BigQueryApplyPolicyTagsJob.do(
delay_in_minutes: @delay_in_minutes,
dataset: DfE::Analytics.config.bigquery_airbyte_dataset,
tables: DfE::Analytics.hidden_pii,
policy_tag: DfE::Analytics.config.bigquery_hidden_policy_tag
)
end
end
end
end
end
Loading