Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
b5d5b42
feat: Audit Source Enum
Warren-Pitterson Mar 31, 2026
52b9524
feat: add queues package
Warren-Pitterson Mar 31, 2026
d3b9991
feat: docker
Warren-Pitterson Mar 31, 2026
8f469bf
feat: sln
Warren-Pitterson Mar 31, 2026
596c412
feat: audit writer function
Warren-Pitterson Mar 31, 2026
debc070
feat: migration
Warren-Pitterson Mar 31, 2026
24af8be
feat: EF model
Warren-Pitterson Mar 31, 2026
18ced9d
feat: queue
Warren-Pitterson Mar 31, 2026
03adaef
feat: ParticipantAuditMessage model
Warren-Pitterson Mar 31, 2026
104628b
feat: receive function queue implementation
Warren-Pitterson Mar 31, 2026
d9e8677
test: tests
Warren-Pitterson Mar 31, 2026
87079a5
chore: removed unnecessary usings
Warren-Pitterson Mar 31, 2026
d59f6b7
Merge branch 'main' into feat/DTOSS-12573-Audit-Log-Table
Warren-Pitterson Apr 7, 2026
2e7ba48
chore: docker sonarqube security fix
Warren-Pitterson Apr 7, 2026
70ba022
Merge branch 'feat/DTOSS-12573-Audit-Log-Table' of https://github.com…
Warren-Pitterson Apr 7, 2026
68af007
Update application/CohortManager/src/Functions/Shared/Common/Extensio…
Warren-Pitterson Apr 7, 2026
980fa1c
chore: PR comment reviews
Warren-Pitterson Apr 7, 2026
8ad27e2
Merge branch 'feat/DTOSS-12573-Audit-Log-Table' of https://github.com…
Warren-Pitterson Apr 7, 2026
f39519b
chore: removed PII
Warren-Pitterson Apr 7, 2026
ba0f97d
chore: removed PII
Warren-Pitterson Apr 7, 2026
94018b9
chore: PII removed
Warren-Pitterson Apr 7, 2026
690a667
feat: return changed to throw
Warren-Pitterson Apr 7, 2026
6dab401
feat: lazy initiation
Warren-Pitterson Apr 7, 2026
5300254
test: updated tests
Warren-Pitterson Apr 7, 2026
1ef4bfd
test: test covering the audit failure path.
Warren-Pitterson Apr 7, 2026
b8d15db
feat: removed redundant Function suffix and changed raw_data_ref to b…
Warren-Pitterson Apr 9, 2026
a05315b
test: added using, date change
Warren-Pitterson Apr 10, 2026
ba5a60f
Merge branch 'main' into feat/DTOSS-12573-Audit-Log-Table
Warren-Pitterson Apr 10, 2026
dbe9ee5
feat: write to blob container, rename container, fallback
Warren-Pitterson Apr 14, 2026
3182d28
test: test added
Warren-Pitterson Apr 14, 2026
a19584d
chore: changed from CREATED_DATETIME to DATE_CREATED to match other s…
Warren-Pitterson Apr 15, 2026
99bd4dc
chore: removed source from blobPath,
Warren-Pitterson Apr 15, 2026
055ed8d
fix: sonarqube error handling
Warren-Pitterson Apr 15, 2026
ef85585
fix: sonarqube - dockerfile change
Warren-Pitterson Apr 15, 2026
6f90778
test: update test to new implementation
Warren-Pitterson Apr 15, 2026
bb6770d
refactor: changed from queue to service bus trigger
Warren-Pitterson Apr 16, 2026
f415474
feat: createdDatetime model changes
Warren-Pitterson Apr 16, 2026
75fe27f
feat: participantAuditLogs made public
Warren-Pitterson Apr 16, 2026
e5e97d0
feat: index changes
Warren-Pitterson Apr 16, 2026
275a03f
test: unit tests
Warren-Pitterson Apr 16, 2026
d46b2d6
test: unit test
Warren-Pitterson Apr 16, 2026
0544439
Merge branch 'main' into feat/DTOSS-12573-Audit-Log-Table
Warren-Pitterson Apr 16, 2026
57a12b2
chore: remove unnecessary using
Warren-Pitterson Apr 16, 2026
2e1abe8
feat: replaced auditQueueSender for IQueueClient
Warren-Pitterson Apr 16, 2026
05e1c8e
chore: queue renamed
Warren-Pitterson Apr 16, 2026
0dda248
feat: removed lazy loading and added BlobStorageHelper Method
Warren-Pitterson Apr 16, 2026
36ac055
chore: typo
Warren-Pitterson Apr 16, 2026
ba0a743
test: updated tests
Warren-Pitterson Apr 16, 2026
4e05854
chore: whitespace and hardcoded value changed to enum
Warren-Pitterson Apr 16, 2026
7d4d91f
chore: whitespace
Warren-Pitterson Apr 16, 2026
73bd965
feat: add environment variables to dockerfile
Warren-Pitterson Apr 16, 2026
7c4f69b
feat: Sequential audit sends in batches
Warren-Pitterson Apr 16, 2026
ddd338b
feat: Queues added to service-bus config, graceful fallback on durabl…
Warren-Pitterson Apr 16, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<PackageVersion Include="Azure.Security.KeyVault.Secrets" Version="4.6.0" />
<PackageVersion Include="Azure.Storage.Blobs" Version="12.20.0" />
<PackageVersion Include="Azure.Storage.Queues" Version="12.20.1" />
<PackageVersion Include="Microsoft.Azure.Functions.Worker.Extensions.Storage.Queues" Version="5.5.0" />
<PackageVersion Include="Azure.Messaging.EventGrid" Version="4.28.0" />
<PackageVersion Include="Microsoft.ApplicationInsights.DependencyCollector" Version="2.23.0" />
<PackageVersion Include="Microsoft.ApplicationInsights.WorkerService" Version="2.23.0" />
Expand Down
15 changes: 15 additions & 0 deletions application/CohortManager/Set-up/service-bus/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,21 @@
"Namespaces": [
{
"Name": "sbemulatorns",
"Queues": [
{
"Name": "participant-audit-queue",
"Properties": {
"DeadLetteringOnMessageExpiration": false,
"DefaultMessageTimeToLive": "PT1H",
"DuplicateDetectionHistoryTimeWindow": "PT20S",
"ForwardDeadLetteredMessagesTo": "",
"LockDuration": "PT1M",
"MaxDeliveryCount": 10,
"RequiresDuplicateDetection": false,
"RequiresSession": false
}
}
],
"Topics": [
{
"Name": "create-exception-topic",
Expand Down
16 changes: 16 additions & 0 deletions application/CohortManager/compose.core.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,22 @@ services:
- CohortDistributionDataServiceURL=http://cohort-distribution-data-service:7992/api/CohortDistributionDataService/
- AcceptableLatencyThresholdMs=500

# Audit Services
audit-writer:
container_name: audit-writer
image: cohort-manager-audit-writer
networks: [cohman-network]
build:
context: ./src/Functions/
dockerfile: AuditServices/AuditWriter/Dockerfile
args:
BASE_IMAGE: ${FUNCTION_BASE_IMAGE}
environment:
- AzureWebJobsStorage=${AZURITE_CONNECTION_STRING}
- DtOsDatabaseConnectionString=Server=db,1433;Database=${DB_NAME};User Id=SA;Password=${PASSWORD};TrustServerCertificate=True
- ServiceBusConnectionString=Endpoint=sb://service-bus;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;
- AuditQueueName=participant-audit-queue


# Screening Data Service
create-exception:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
namespace NHS.CohortManager.AuditServices;

using System.Text.Json;
using DataServices.Database;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Model;
using Common;

public class AuditWriter
{
private const string AuditBlobContainer = "participant-audit";
private readonly DataServicesContext _dbContext;
private readonly ILogger<AuditWriter> _logger;
private readonly IBlobStorageHelper _blobStorageHelper;
private readonly AuditWriterConfig _config;

private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNameCaseInsensitive = true
};

public AuditWriter(DataServicesContext dbContext, ILogger<AuditWriter> logger, IBlobStorageHelper blobStorageHelper, AuditWriterConfig config)
{
_dbContext = dbContext;
_logger = logger;
_blobStorageHelper = blobStorageHelper;
_config = config;
}

[Function(nameof(AuditWriter))]
public async Task Run(
[ServiceBusTrigger("%AuditQueueName%", Connection = "ServiceBusConnectionString")] string messageText, FunctionContext context)
{
ParticipantAuditMessage? audit;
try
{
audit = JsonSerializer.Deserialize<ParticipantAuditMessage>(messageText, JsonOptions);
}
Comment thread
Warren-Pitterson marked this conversation as resolved.
catch (JsonException ex)
{
throw new InvalidOperationException("Failed to deserialise audit message.", ex);
}

if (audit is null)
{
_logger.LogError("Failed to deserialise audit message");
throw new JsonException("Audit message deserialised to null.");
}

audit.RawDataRef = await WriteSnapshotToBlobAsync(audit);

var auditLog = new ParticipantAuditLog
{
CorrelationId = audit.CorrelationId,
NhsNumber = audit.NhsNumber,
BatchId = audit.BatchId,
CreatedDatetime = audit.CreatedDatetime,
RecordSource = (int)audit.Source,
RecordSourceDesc = audit.RecordSourceDesc,
CreatedBy = audit.CreatedBy,
ScreeningId = audit.ScreeningId,
RawDataRef = audit.RawDataRef
};

_dbContext.Set<ParticipantAuditLog>().Add(auditLog);
await _dbContext.SaveChangesAsync();

_logger.LogInformation("Audit written | Source: {Source} | Correlation: {CorrelationId}",
audit.Source, audit.CorrelationId);
}

private async Task<string?> WriteSnapshotToBlobAsync(ParticipantAuditMessage message)
{
try
{
var blobPath = $"{message.CreatedDatetime:dd-MM-yyyy}/{message.CorrelationId}.json";

var payload = message.RequestSnapshot is not null
? JsonSerializer.SerializeToUtf8Bytes(message.RequestSnapshot)
: JsonSerializer.SerializeToUtf8Bytes(message);

var blobFile = new BlobFile(payload, blobPath);
var uri = await _blobStorageHelper.UploadFileToBlobStorageAndGetUri(
_config.AzureWebJobsStorage, AuditBlobContainer, blobFile, overwrite: true);

if (uri is null)
{
_logger.LogError(
"Failed to write audit snapshot to blob for CorrelationId {CorrelationId}. " +
"Audit will be persisted without a blob reference.",
message.CorrelationId);
return null;
}

return uri;
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to write audit snapshot to blob for CorrelationId {CorrelationId}. " +
"Audit will be persisted without a blob reference.",
message.CorrelationId);
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<ProjectGuid>{A1B2C3D4-E5F6-7890-ABCD-EF1234567890}</ProjectGuid>
<TargetFramework>net8.0</TargetFramework>
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
<OutputType>Exe</OutputType>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Functions.Worker" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.ServiceBus" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" />
<PackageReference Include="Microsoft.Extensions.Diagnostics.HealthChecks" />
<PackageReference Include="Azure.Storage.Blobs" />
<PackageReference Include="Azure.Messaging.ServiceBus" />
</ItemGroup>
<ItemGroup>
<None Update="host.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="local.settings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<CopyToPublishDirectory>Never</CopyToPublishDirectory>
</None>
</ItemGroup>
<ItemGroup>
<Using Include="System.Threading.ExecutionContext" Alias="ExecutionContext" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\Shared\DataServices.Database\DataServices.Database.csproj" />
<ProjectReference Include="..\..\Shared\DataServices.Core\DataServices.Core.csproj" />
<ProjectReference Include="..\..\Shared\Common\Common.csproj" />
<ProjectReference Include="..\..\Shared\HealthChecks\HealthChecks.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace NHS.CohortManager.AuditServices;

using System.ComponentModel.DataAnnotations;

public class AuditWriterConfig
{
[Required]
public required string ServiceBusConnectionString { get; set; }
[Required]
public required string AzureWebJobsStorage { get; set; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
ARG BASE_IMAGE
FROM $BASE_IMAGE AS function

COPY ./AuditServices/AuditWriter /app/src/dotnet-function-app
WORKDIR /app/src/dotnet-function-app

RUN --mount=type=cache,target=/root/.nuget/packages \
dotnet publish ./*.csproj --output /home/site/wwwroot

# To enable ssh & remote debugging on app service change the base image to the one below
# FROM mcr.microsoft.com/azure-functions/dotnet-isolated:4-dotnet-isolated8.0-appservice
FROM mcr.microsoft.com/azure-functions/dotnet-isolated:4-dotnet-isolated8.0
ENV AzureWebJobsScriptRoot=/home/site/wwwroot \
AzureFunctionsJobHost__Logging__Console__IsEnabled=true

COPY --from=function ["/home/site/wwwroot", "/home/site/wwwroot"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
using DataServices.Core;
using DataServices.Database;
using HealthChecks.Extensions;
using Common;
using NHS.CohortManager.AuditServices;

var host = new HostBuilder()
.ConfigureFunctionsWorkerDefaults()
.AddConfiguration<AuditWriterConfig>(out AuditWriterConfig config)
.AddDataServicesHandler<DataServicesContext>()
.AddServiceBusClient(config.ServiceBusConnectionString)
.ConfigureServices(services =>
{
services.AddDatabaseHealthCheck("AuditWriter");
services.AddSingleton(config);
services.AddTransient<IBlobStorageHelper, BlobStorageHelper>();
})
.AddTelemetry()
.Build();

await host.RunAsync();
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,13 @@ public async Task<bool> PostDemographicDataAsync(List<ParticipantDemographic> pa
{
var content = JsonSerializer.Serialize(participants);
var response = await _httpClientFunction.SendPost(DemographicFunctionURI, content);
if (!response.IsSuccessStatusCode || response.Headers.Location is null)
{
_logger.LogError("DurableDemographicFunction returned {StatusCode} with no Location header", response.StatusCode);
return false;
}
responseContent = response.Headers.Location.ToString();

responseContent = response.Headers.Location!.ToString();
// This is not retrying the function if it fails but checking if it has done yet.
var retryPolicy = Policy
.HandleResult<WorkFlowStatus>(status => status != WorkFlowStatus.Completed && status != WorkFlowStatus.Failed)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Common;
Expand All @@ -9,7 +8,6 @@
using Model;
using DataServices.Client;
using HealthChecks.Extensions;
using Microsoft.Extensions.Options;


var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace NHS.Screening.ReceiveCaasFile;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Model;
using Model.Enums;
using System;
using System.IO;
using ParquetSharp.RowOriented;
Expand All @@ -19,14 +20,16 @@ public class ReceiveCaasFile
private readonly ReceiveCaasFileConfig _config;
private readonly IBlobStorageHelper _blobStorageHelper;
private readonly IExceptionHandler _exceptionHandler;
private readonly IQueueClient _queueClient;

public ReceiveCaasFile(
ILogger<ReceiveCaasFile> logger,
IProcessCaasFile processCaasFile,
IDataServiceClient<ScreeningLkp> screeningLkpClient,
IOptions<ReceiveCaasFileConfig> receiveCaasFileConfig,
IBlobStorageHelper blobStorageHelper,
IExceptionHandler exceptionHandler
IExceptionHandler exceptionHandler,
IQueueClient queueClient
)
{
_logger = logger;
Expand All @@ -35,6 +38,7 @@ IExceptionHandler exceptionHandler
_config = receiveCaasFileConfig.Value;
_blobStorageHelper = blobStorageHelper;
_exceptionHandler = exceptionHandler;
_queueClient = queueClient;
}

[Function(nameof(ReceiveCaasFile))]
Expand All @@ -56,15 +60,17 @@ public async Task Run([BlobTrigger("inbound/{name}", Connection = "caasfolder_ST
downloadFilePath = Path.Combine(Path.GetTempPath(), name);

_logger.LogInformation("Downloading file from the blob, file: {Name}.", name);
// In order to use the parquet file we need to download it

// In order to use the parquet file we need to download it
await using (var fileStream = File.Create(downloadFilePath))
{
await blobStream.CopyToAsync(fileStream);
}

var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };

var batchId = Guid.NewGuid();

using (var rowReader = ParquetFile.CreateRowReader<ParticipantsParquetMap>(downloadFilePath))
{
// A Parquet file is divided into one or more row groups. Each row group contains a specific number of rows.
Expand All @@ -74,6 +80,7 @@ public async Task Run([BlobTrigger("inbound/{name}", Connection = "caasfolder_ST
var listOfAllValues = values.ToList();
var allTasks = new List<Task>();

await EnqueueAuditMessagesAsync(listOfAllValues, name, batchId, (int)screeningService.ScreeningId);
//split list of all into N amount of chunks to be processed as batches.
Comment thread
Warren-Pitterson marked this conversation as resolved.
var chunks = listOfAllValues.Chunk(BatchSize).ToList();

Expand Down Expand Up @@ -125,4 +132,29 @@ public async Task<ScreeningLkp> GetScreeningService(FileNameParser fileNameParse

return screeningService;
}

private async Task EnqueueAuditMessagesAsync(
List<ParticipantsParquetMap> participants,
string fileName,
Guid batchId,
int screeningId)
{
var auditMessages = participants
.Where(w => w.NhsNumber.HasValue)
.Select(participant => new ParticipantAuditMessage
{
NhsNumber = participant.NhsNumber!.Value.ToString(),
Source = AuditSource.ParquetFile,
BatchId = batchId,
RecordSourceDesc = $"Parquet file: {fileName}",
CreatedBy = nameof(ReceiveCaasFile),
ScreeningId = screeningId,
});

var failCount = await _queueClient.AddBatchAsync(auditMessages, _config.AuditQueueName);
if (failCount != 0)
{
_logger.LogWarning("Audit enqueue failed for {FailCount} participants in batch {BatchId}", failCount, batchId);
}
Comment thread
Warren-Pitterson marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ public class ReceiveCaasFileConfig
public required string GetOrchestrationStatusURL { get; set; }
[Required]
public required string ParticipantManagementTopic { get; set; }
[Required]
public required string AuditQueueName { get; set; }
}
Loading
Loading