Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions eng/packages/ProjectTemplates.props
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
<PackageVersion Include="ModelContextProtocol" Version="0.7.0-preview.1" />
<PackageVersion Include="ModelContextProtocol.AspNetCore" Version="0.7.0-preview.1" />
<PackageVersion Include="OllamaSharp" Version="5.4.12" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.14.0" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.14.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.14.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.Http" Version="1.14.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.Runtime" Version="1.14.0" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.15.3" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.15.3" />
<PackageVersion Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.15.2" />
<PackageVersion Include="OpenTelemetry.Instrumentation.Http" Version="1.15.1" />
<PackageVersion Include="OpenTelemetry.Instrumentation.Runtime" Version="1.15.1" />
<PackageVersion Include="PdfPig" Version="0.1.12" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@ internal static class ProcessFile
internal const string ActivityName = "ProcessFile";
internal const string FilePathTagName = "rag.file.path";
}

internal static class ProcessDocument
{
internal const string ActivityName = "ProcessDocument";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ namespace Microsoft.Extensions.DataIngestion;
/// <typeparam name="T">The type of the chunk content.</typeparam>
public sealed class IngestionPipeline<T> : IDisposable
{
private readonly IngestionDocumentReader _reader;
private readonly IngestionChunker<T> _chunker;
private readonly IngestionChunkWriter<T> _writer;
private readonly ActivitySource _activitySource;
Expand All @@ -33,19 +32,16 @@ public sealed class IngestionPipeline<T> : IDisposable
/// <summary>
/// Initializes a new instance of the <see cref="IngestionPipeline{T}"/> class.
/// </summary>
/// <param name="reader">The reader for ingestion documents.</param>
/// <param name="chunker">The chunker to split documents into chunks.</param>
/// <param name="writer">The writer for processing chunks.</param>
/// <param name="options">The options for the ingestion pipeline.</param>
/// <param name="loggerFactory">The logger factory for creating loggers.</param>
public IngestionPipeline(
IngestionDocumentReader reader,
IngestionChunker<T> chunker,
IngestionChunkWriter<T> writer,
IngestionPipelineOptions? options = default,
ILoggerFactory? loggerFactory = default)
{
_reader = Throw.IfNull(reader);
_chunker = Throw.IfNull(chunker);
_writer = Throw.IfNull(writer);
_activitySource = new((options ?? new()).ActivitySourceName);
Expand All @@ -69,17 +65,36 @@ public void Dispose()
/// </summary>
public IList<IngestionChunkProcessor<T>> ChunkProcessors { get; } = [];

/// <summary>
/// Processes the specified document through the pipeline.
/// </summary>
/// <param name="document">The document to process.</param>
/// <param name="cancellationToken">The cancellation token for the operation.</param>
/// <returns>A task representing the asynchronous operation, returning the processed document.</returns>
public async Task<IngestionDocument> ProcessAsync(IngestionDocument document, CancellationToken cancellationToken = default)
{
Throw.IfNull(document);

using (Activity? activity = _activitySource.StartActivity(ProcessDocument.ActivityName))
{
activity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier);
return await IngestAsync(document, activity, cancellationToken).ConfigureAwait(false);
}
}

/// <summary>
/// Processes all files in the specified directory that match the given search pattern and option.
/// </summary>
/// <param name="reader">The reader for ingestion documents.</param>
/// <param name="directory">The directory to process.</param>
/// <param name="searchPattern">The search pattern for file selection.</param>
/// <param name="searchOption">The search option for directory traversal.</param>
/// <param name="cancellationToken">The cancellation token for the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public async IAsyncEnumerable<IngestionResult> ProcessAsync(DirectoryInfo directory, string searchPattern = "*.*",
public async IAsyncEnumerable<IngestionResult> ProcessAsync(IngestionDocumentReader reader, DirectoryInfo directory, string searchPattern = "*.*",
SearchOption searchOption = SearchOption.TopDirectoryOnly, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Throw.IfNull(reader);
Throw.IfNull(directory);
Throw.IfNullOrEmpty(searchPattern);
Throw.IfOutOfRange((int)searchOption, (int)SearchOption.TopDirectoryOnly, (int)SearchOption.AllDirectories);
Expand All @@ -91,7 +106,7 @@ public async IAsyncEnumerable<IngestionResult> ProcessAsync(DirectoryInfo direct
.SetTag(ProcessDirectory.SearchOptionTagName, searchOption.ToString());
_logger?.ProcessingDirectory(directory.FullName, searchPattern, searchOption);

await foreach (var ingestionResult in ProcessAsync(directory.EnumerateFiles(searchPattern, searchOption), rootActivity, cancellationToken).ConfigureAwait(false))
await foreach (IngestionResult ingestionResult in ProcessAsync(reader, directory.EnumerateFiles(searchPattern, searchOption), rootActivity, cancellationToken).ConfigureAwait(false))
{
yield return ingestionResult;
}
Expand All @@ -101,16 +116,18 @@ public async IAsyncEnumerable<IngestionResult> ProcessAsync(DirectoryInfo direct
/// <summary>
/// Processes the specified files.
/// </summary>
/// <param name="reader">The reader for ingestion documents.</param>
/// <param name="files">The collection of files to process.</param>
/// <param name="cancellationToken">The cancellation token for the operation.</param>
/// <returns>A task representing the asynchronous operation.</returns>
public async IAsyncEnumerable<IngestionResult> ProcessAsync(IEnumerable<FileInfo> files, [EnumeratorCancellation] CancellationToken cancellationToken = default)
public async IAsyncEnumerable<IngestionResult> ProcessAsync(IngestionDocumentReader reader, IEnumerable<FileInfo> files, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Throw.IfNull(reader);
Throw.IfNull(files);

using (Activity? rootActivity = _activitySource.StartActivity(ProcessFiles.ActivityName))
{
await foreach (var ingestionResult in ProcessAsync(files, rootActivity, cancellationToken).ConfigureAwait(false))
await foreach (IngestionResult ingestionResult in ProcessAsync(reader, files, rootActivity, cancellationToken).ConfigureAwait(false))
{
yield return ingestionResult;
}
Expand All @@ -125,7 +142,7 @@ private static void TraceException(Activity? activity, Exception ex)
.SetStatus(ActivityStatusCode.Error, ex.Message);
}

private async IAsyncEnumerable<IngestionResult> ProcessAsync(IEnumerable<FileInfo> files, Activity? rootActivity,
private async IAsyncEnumerable<IngestionResult> ProcessAsync(IngestionDocumentReader reader, IEnumerable<FileInfo> files, Activity? rootActivity,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
#if NET
Expand All @@ -143,13 +160,13 @@ private async IAsyncEnumerable<IngestionResult> ProcessAsync(IEnumerable<FileInf
using (Activity? processFileActivity = _activitySource.StartActivity(ProcessFile.ActivityName, ActivityKind.Internal, parentContext: rootActivity?.Context ?? default))
{
processFileActivity?.SetTag(ProcessFile.FilePathTagName, fileInfo.FullName);
_logger?.ReadingFile(fileInfo.FullName, GetShortName(_reader));
_logger?.ReadingFile(fileInfo.FullName, GetShortName(reader));

IngestionDocument? document = null;
Exception? failure = null;
try
{
document = await _reader.ReadAsync(fileInfo, cancellationToken).ConfigureAwait(false);
document = await reader.ReadAsync(fileInfo, cancellationToken).ConfigureAwait(false);

processFileActivity?.SetTag(ProcessSource.DocumentIdTagName, document.Identifier);
_logger?.ReadDocument(document.Identifier);
Expand Down Expand Up @@ -181,7 +198,7 @@ private async Task<IngestionDocument> IngestAsync(IngestionDocument document, Ac
}

IAsyncEnumerable<IngestionChunk<T>> chunks = _chunker.ProcessAsync(document, cancellationToken);
foreach (var processor in ChunkProcessors)
foreach (IngestionChunkProcessor<T> processor in ChunkProcessors)
{
chunks = processor.ProcessAsync(chunks, cancellationToken);
}
Expand Down
32 changes: 32 additions & 0 deletions src/Libraries/Microsoft.Extensions.DataIngestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,38 @@ using VectorStoreWriter<string, IngestionChunkVectorRecord<string>> writer = new
await writer.WriteAsync(chunks);
```

## Using the IngestionPipeline

### Processing documents from the file system

To process documents from the file system, create an `IngestionPipeline` and pass a reader to the `ProcessAsync` method:

```csharp
using IngestionPipeline<string> pipeline = new(chunker, writer);

IngestionDocumentReader reader = new MarkdownReader();
await foreach (IngestionResult result in pipeline.ProcessAsync(reader, directory, "*.md"))
{
Console.WriteLine($"Processed '{result.DocumentId}'. Succeeded: {result.Succeeded}");
}
```

### Processing documents without a reader

The `IngestionPipeline` can also process documents that are already in memory, without requiring a reader:

```csharp
using IngestionPipeline<string> pipeline = new(chunker, writer);

IngestionDocument document = new("my-document-id");
IngestionDocumentSection section = new("Main");
section.Elements.Add(new IngestionDocumentHeader("# Introduction") { Level = 1 });
section.Elements.Add(new IngestionDocumentParagraph("This is the content of my document."));
document.Sections.Add(section);

IngestionDocument processedDocument = await pipeline.ProcessAsync(document);
```

### Custom metadata

To store custom metadata alongside each chunk, create a type derived from `IngestionChunkVectorRecord<TChunk>` with additional properties, and a `VectorStoreWriter` subclass that overrides `SetMetadata`:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ public async Task IngestDataAsync(DirectoryInfo directory, string searchPattern)
IncrementalIngestion = false,
});

DocumentReader reader = new(directory);
using var pipeline = new IngestionPipeline<string>(
reader: new DocumentReader(directory),
chunker: new SemanticSimilarityChunker(embeddingGenerator, new(TiktokenTokenizer.CreateForModel("gpt-4o"))),
writer: writer,
loggerFactory: loggerFactory);

await foreach (var result in pipeline.ProcessAsync(directory, searchPattern))
await foreach (var result in pipeline.ProcessAsync(reader, directory, searchPattern))
{
logger.LogInformation("Completed processing '{id}'. Succeeded: '{succeeded}'.", result.DocumentId, result.Succeeded);
}
Expand Down
Loading
Loading