-
Notifications
You must be signed in to change notification settings - Fork 25
Sagas
This Service is part of the legal-entity-core module and it creates Legal Entities and their supporting objects from an aggregate model (Legal Entity) that contains all info required to ingest one Customer or multiple customers in a hierarchy.
-
For each Legal Entity object, it will either retrieve the existing Legal Entity or create a new one.
-
Next, it will either create or update the Administrator users which are used to create a Master Service agreement.
-
Next, After the users are created/retrieved and enriched with their internal Ids we can set up the Master Service
-
Next, it will create/update product groups (Arrangements) if they exist in the Legal Entity model
TODO: Include the diagram
This service is part of the stream-product module, it used implicitly in Legal Entity Saga and can be used independently, also it's responsible for ingesting customer products, arrangements, and set up user permissions.
This service is part of the approvals-core module, and it creates Policies and Approval Types and their supporting objects, also it's responsible for ingesting/assigning all approvals flow.
- This is not idempotent implementation (due to lack of service-api. The request for improvements is created)
- For each Approval object, it will create a new one of Policy and Approval Type.
TODO this service is not implemented yet
TODO this service is not implemented yet
TODO this service is not implemented yet
The investment module (stream-investment/investment-core) contains three separate sagas for wealth management data ingestion:
- InvestmentSaga — Orchestrates client onboarding, portfolio products, portfolios, trading accounts, deposits, allocations, risk assessments, and model portfolios. Executes 9 sequential steps from client creation through allocation generation.
- InvestmentAssetUniverseSaga — Ingests static/reference data: currencies, markets, market special days, asset category types, asset categories, assets, close prices, and intraday prices. Uses async bulk API for price ingestion.
- InvestmentContentSaga — Ingests content management data: market news tags, news entries, document tags, and content documents.
All three sagas are idempotent (safe to retry) and can be individually enabled/disabled via configuration flags (wealth-enabled, asset-universe-enabled, content-enabled).
The stream-investment module orchestrates data ingestion for Backbase Investment (Wealth Management) services. It handles the complete lifecycle of investment data — from asset universe setup (markets, currencies, assets, prices) to client onboarding, portfolio creation, risk assessment, allocations, deposits, and content management (news & documents).
The module is implemented as a single Maven sub-module (investment-core) and exposes three independent sagas, each responsible for a distinct domain within the investment ingestion workflow.
| Artifact | Description |
|---|---|
stream-investment |
Parent POM (pom packaging) |
investment-core |
Core library containing sagas, services, models, and configuration |
Like all Stream Services modules, stream-investment follows the Saga + StreamTask pattern:
-
StreamTasksubclasses wrap the aggregate ingestion data. -
Saga classes implement
StreamTaskExecutor<T>withexecuteTask(T)returningMono<T>. - Each saga step is chained via
.flatMap(), propagating the task through the reactive pipeline. - Task state is tracked (
ACCEPTED→IN_PROGRESS→COMPLETED/FAILED) and history is recorded viatask.info()/task.error().
The investment module is decomposed into three separate sagas to allow independent execution of each concern:
| Saga | Task Class | Data Class | Purpose |
|---|---|---|---|
InvestmentSaga |
InvestmentTask |
InvestmentData |
Client onboarding, portfolio products, portfolios, trading accounts, deposits, allocations, risk assessments, model portfolios |
InvestmentAssetUniverseSaga |
InvestmentAssetsTask |
InvestmentAssetData |
Currencies, markets, market special days, asset category types, asset categories, assets, close prices, intraday prices |
InvestmentContentSaga |
InvestmentContentTask |
InvestmentContentData |
Market news tags, market news entries, document tags, content documents |
Each saga can be individually disabled via configuration properties (see Configuration).
The main saga that orchestrates investment client and portfolio ingestion. It executes the following steps sequentially:
upsertInvestmentPortfolioModels
→ upsertClients
→ upsertRiskQuestions
→ upsertRiskAssessments
→ upsertInvestmentProducts
→ upsertInvestmentPortfolios
→ upsertPortfolioTradingAccounts
→ upsertInvestmentPortfolioDeposits
→ upsertPortfoliosAllocations
| # | Step | Description |
|---|---|---|
| 1 | Upsert Portfolio Models | Creates or updates model portfolios (financial advice templates) with allocation definitions |
| 2 | Upsert Clients | Creates or updates investment client records, linking internal user IDs with external user IDs and legal entities |
| 3 | Upsert Risk Questions | Creates or updates risk assessment questionnaires extracted from portfolio risk assessment data |
| 4 | Upsert Risk Assessments | Creates or updates user risk assessment responses, matching assessments to clients by username |
| 5 | Upsert Investment Products | Creates or updates portfolio products (e.g., SELF_TRADING type) for each investment arrangement |
| 6 | Upsert Investment Portfolios | Creates or updates portfolios with client associations resolved from legal entity external IDs |
| 7 | Upsert Portfolio Trading Accounts | Creates or updates trading accounts associated with portfolios |
| 8 | Upsert Portfolio Deposits | Creates initial seed deposits for portfolios and generates deposit allocations |
| 9 | Upsert Portfolio Allocations | Waits for async price tasks to finish, then generates portfolio allocations based on products and asset data |
InvestmentTask
└── InvestmentData
├── clientUsers: List<ClientUser> // Users to onboard as investment clients
├── investmentArrangements: List<InvestmentArrangement> // Arrangement definitions
├── modelPortfolios: List<ModelPortfolio> // Model portfolio templates
├── portfolioProducts: List<PortfolioProduct> // Created portfolio products
├── portfolios: List<InvestmentPortfolio> // Created portfolios
├── investmentPortfolioTradingAccounts: List<InvestmentPortfolioTradingAccount>
├── portfolioRiskAssessments: List<PortfolioRiskAssessment>
└── investmentAssetData: InvestmentAssetData // Reference to asset data (for allocations)
- The saga catches all errors at the top level with
onErrorResume, logging failures and returning the task inFAILEDstate. - Individual steps log errors with full context (
taskId, counts) and record them in the task history. - Portfolio deposit creation failures are handled gracefully with
onErrorResume(Mono.empty())to avoid blocking the rest of the flow.
Orchestrates the ingestion of static/reference data for the investment universe:
upsertCurrencies
→ upsertMarkets
→ upsertMarketSpecialDays
→ upsertAssetCategoryTypes
→ upsertAssetCategories
→ upsertAssets
→ upsertPrices
→ createIntradayPrices
| # | Step | Description |
|---|---|---|
| 1 | Upsert Currencies | Creates or updates supported currencies |
| 2 | Upsert Markets | Creates or updates stock exchange markets (with session times, timezones) |
| 3 | Upsert Market Special Days | Creates or updates market holidays and special trading days |
| 4 | Upsert Asset Category Types | Creates or updates asset classification type definitions |
| 5 | Upsert Asset Categories | Creates or updates asset categories with optional logo images |
| 6 | Upsert Assets | Creates or updates financial instruments (ISIN-based), optionally with logo images |
| 7 | Upsert Prices | Ingests historical close prices for assets (async bulk operation) |
| 8 | Create Intraday Prices | Waits for close-price async tasks, then ingests intraday price data |
InvestmentAssetsTask
└── InvestmentAssetData
├── currencies: List<Currency>
├── markets: List<Market>
├── marketSpecialDays: List<MarketSpecialDay>
├── assetCategoryTypes: List<AssetCategoryType>
├── assetCategories: List<AssetCategoryEntry>
├── assets: List<Asset>
├── assetPrices: List<AssetPrice>
├── priceAsyncTasks: List<GroupResult> // Async task tracking for close prices
└── intradayPriceAsyncTasks: List<GroupResult> // Async task tracking for intraday prices
Price ingestion uses the async bulk API (AsyncBulkGroupsApi). Close prices are submitted as bulk groups, and the saga polls for completion before proceeding to intraday price ingestion. The AsyncTaskService handles polling and status checks.
Orchestrates the ingestion of content and news for the investment platform:
upsertNewsTags
→ upsertNewsContent
→ upsertDocumentTags
→ upsertContentDocuments
| # | Step | Description |
|---|---|---|
| 1 | Upsert News Tags | Creates or updates tags used to categorize market news |
| 2 | Upsert News Content | Creates or updates market news entries with optional thumbnails |
| 3 | Upsert Document Tags | Creates or updates tags used to categorize content documents |
| 4 | Upsert Content Documents | Creates or updates content document entries |
InvestmentContentTask
└── InvestmentContentData
├── marketNewsTags: List<ContentTag>
├── marketNews: List<MarketNewsEntry>
├── documentTags: List<ContentTag>
└── documents: List<ContentDocumentEntry>
The saga delegates all DBS API interactions to a set of thin service wrappers. Each service encapsulates a specific domain within the Investment Service API:
| Service | DBS API(s) Used | Responsibility |
|---|---|---|
InvestmentClientService |
ClientApi |
Create, list, get, patch, update investment clients |
InvestmentPortfolioService |
PortfolioApi, InvestmentProductsApi, PaymentsApi, PortfolioTradingAccountsApi
|
Portfolio products, portfolios, deposits, trading accounts |
InvestmentAssetUniverseService |
AssetUniverseApi, InvestmentRestAssetUniverseService
|
Markets, assets, asset categories, category types, special days |
InvestmentModelPortfolioService |
FinancialAdviceApi, CustomIntegrationApiService
|
Model portfolio CRUD |
InvestmentPortfolioAllocationService |
AllocationsApi, InvestmentApi, AssetUniverseApi, CustomIntegrationApiService
|
Portfolio allocations, orders, asset prices for allocation calculation |
InvestmentAssetPriceService |
AssetUniverseApi |
Historical close price ingestion |
InvestmentIntradayAssetPriceService |
AssetUniverseApi |
Intraday price ingestion |
InvestmentCurrencyService |
CurrencyApi |
Currency CRUD |
InvestmentRiskAssessmentService |
RiskAssessmentApi |
Risk assessment CRUD per client |
InvestmentRiskQuestionaryService |
RiskAssessmentApi |
Risk question/questionnaire management |
AsyncTaskService |
AsyncBulkGroupsApi |
Polls bulk operation status for async tasks |
WorkDayService |
(none — utility) | Business day calculations |
CustomIntegrationApiService |
Integration API (/integration-api/v2/) |
Asset creation, model portfolio CRUD with image upload (deprecated since 8.6.0) |
Some operations require multipart/form-data uploads that the generated WebClient wrappers cannot handle. These use synchronous RestTemplate-based clients:
| Service | Purpose |
|---|---|
InvestmentRestAssetUniverseService |
Asset and asset category logo/image uploads via PATCH |
InvestmentRestNewsContentService |
News content entry creation/update with thumbnail uploads |
InvestmentRestDocumentContentService |
Content document creation/update with file uploads |
These are configured in InvestmentRestServiceApiConfiguration using a separate synchronous ApiClient.
All services implement an idempotent upsert pattern:
- Attempt to GET/LIST the existing entity
- If found → PATCH/PUT to update
- If not found (404) → POST to create
This ensures safe re-execution and retry without duplicate data.
The entire investment module is conditional on a single property:
backbase:
bootstrap:
ingestions:
investment:
enabled: true # Master switch — must be true for any investment bean to loadIndividual saga flows can be toggled independently:
backbase:
bootstrap:
ingestions:
investment:
enabled: true
wealth-enabled: true # InvestmentSaga (clients, portfolios, etc.)
asset-universe-enabled: true # InvestmentAssetUniverseSaga
content-enabled: true # InvestmentContentSagaFine-grained properties for portfolio, allocation, deposit, and asset ingestion behaviour:
backbase:
bootstrap:
ingestions:
investment:
config:
portfolio:
default-currency: EUR # ISO 4217 currency for new portfolios
activation-past-months: 1 # How far back to set activation date
allocation:
model-portfolio-allocation-asset: "model_portfolio.allocation.asset"
allocation-concurrency: 5
default-amount: 10000.0
deposit:
provider: null # Payment provider ID (set for non-mock)
default-amount: 10000.0 # Seed deposit amount
asset:
ingest-images: true # Upload asset/category logos
market-concurrency: 5
market-special-day-concurrency: 5
asset-category-concurrency: 5
asset-category-type-concurrency: 5
assessment:
risk-questions-page-size: 100The Investment DBS client is configured via CompositeApiClientConfig:
backbase:
communication:
services:
investment:
service-id: investment # Spring Cloud service discovery ID
# direct-uri: http://localhost:8080 # Use for direct connection (bypasses discovery)Connection pool and timeout settings for the reactive WebClient:
backbase:
communication:
services:
investment:
http-client:
max-connections: 100
max-idle-time-minutes: 5
max-life-time-minutes: 10
max-pending-acquires: 200
pending-acquire-timeout-millis: 45000
connect-timeout-seconds: 10
read-timeout-seconds: 30
write-timeout-seconds: 30
evict-in-background-seconds: 120For multipart upload operations, a separate synchronous client is configured:
backbase:
investment:
communication:
integration:
service-id: investment
service-url: ""The module uses @Configuration + @Import chains (no component scanning):
InvestmentServiceConfiguration ← Main entry point
├── @Import(DbsApiClientsAutoConfiguration) ← Base DBS client auto-config
├── @Import(InvestmentClientConfig) ← Investment API client beans
│ ├── ApiClient (investmentApiClient)
│ ├── ClientApi, PortfolioApi, InvestmentProductsApi, ...
│ └── All 12+ Investment DBS API beans
├── @Import(InvestmentWebClientConfiguration) ← Connection pool & timeouts
├── @Import(InvestmentRestServiceApiConfiguration) ← REST Template clients
├── Service beans (InvestmentClientService, InvestmentPortfolioService, ...)
└── Saga beans (InvestmentSaga, InvestmentAssetUniverseSaga, InvestmentContentSaga)
All beans are registered with @ConditionalOnProperty(name = "backbase.bootstrap.ingestions.investment.enabled"), so the entire module is inactive unless explicitly enabled.
The module integrates with 46 unique Investment Service API endpoints across 10 API groups:
| API Group | Endpoints | Operations |
|---|---|---|
| Asset Universe API | 17 | Markets, assets, categories, category types, special days, prices |
| Client API | 5 | Client CRUD |
| Investment Products API | 3 | Portfolio product CRUD |
| Portfolio API | 3 | Portfolio CRUD |
| Financial Advice API | 3 | Model portfolio CRUD |
| Allocations API | 3 | Portfolio allocation CRUD |
| Content API | 3 | News/document content CRUD |
| Payments API | 2 | Deposit CRUD |
| Investment API | 2 | Order CRUD |
| Async Bulk Groups API | 1 | Bulk operation status polling |
| Integration API | 4 | Asset creation, model portfolio CRUD, allocation creation (deprecated since 8.6.0) |
For the full endpoint list, see
INVESTMENT_API_ENDPOINTS_USED.md.
stream-investment/
├── pom.xml # Parent POM
├── INVESTMENT_API_ENDPOINTS_USED.md # Complete API endpoint reference
└── investment-core/
├── pom.xml # Core module POM (boat-maven-plugin for code generation)
└── src/
├── main/java/com/backbase/stream/
│ ├── configuration/
│ │ ├── InvestmentServiceConfiguration.java # Main @Configuration (entry point)
│ │ ├── InvestmentClientConfig.java # DBS API client beans
│ │ ├── InvestmentWebClientConfiguration.java # Connection pool & timeouts
│ │ ├── InvestmentWebClientProperties.java # HTTP client properties
│ │ ├── InvestmentRestServiceApiConfiguration.java # RestTemplate clients
│ │ ├── InvestmentIngestionConfigurationProperties.java # Feature flags
│ │ └── IngestConfigProperties.java # Fine-grained service config
│ └── investment/
│ ├── saga/
│ │ ├── InvestmentSaga.java # Main client/portfolio saga
│ │ ├── InvestmentAssetUniverseSaga.java # Asset universe saga
│ │ └── InvestmentContentSaga.java # Content/news saga
│ ├── service/
│ │ ├── InvestmentClientService.java
│ │ ├── InvestmentPortfolioService.java
│ │ ├── InvestmentAssetUniverseService.java
│ │ ├── InvestmentModelPortfolioService.java
│ │ ├── InvestmentPortfolioAllocationService.java
│ │ ├── InvestmentAssetPriceService.java
│ │ ├── InvestmentIntradayAssetPriceService.java
│ │ ├── InvestmentCurrencyService.java
│ │ ├── InvestmentRiskAssessmentService.java
│ │ ├── InvestmentRiskQuestionaryService.java
│ │ ├── AsyncTaskService.java
│ │ ├── CustomIntegrationApiService.java
│ │ ├── WorkDayService.java
│ │ ├── AssetMapper.java # MapStruct mapper
│ │ └── resttemplate/
│ │ ├── InvestmentRestAssetUniverseService.java
│ │ ├── InvestmentRestNewsContentService.java
│ │ ├── InvestmentRestDocumentContentService.java
│ │ ├── ContentMapper.java
│ │ └── RestTemplateAssetMapper.java
│ ├── model/ # Domain model POJOs
│ │ ├── InvestmentPortfolio.java
│ │ ├── InvestmentPortfolioTradingAccount.java
│ │ ├── AssetCategoryEntry.java
│ │ ├── AssetWithMarketAndLatestPrice.java
│ │ ├── ContentDocumentEntry.java
│ │ ├── ContentTag.java
│ │ ├── ExpandedLatestPrice.java
│ │ ├── ExpandedMarket.java
│ │ ├── MarketNewsEntry.java
│ │ ├── PaginatedExpandedAssetList.java
│ │ ├── QuestionChoice.java
│ │ ├── RiskQuestion.java
│ │ ├── UpsertPartition.java
│ │ └── UserRiskAssessment.java
│ ├── InvestmentTask.java # StreamTask — main saga
│ ├── InvestmentAssetsTask.java # StreamTask — asset universe
│ ├── InvestmentContentTask.java # StreamTask — content
│ ├── InvestmentData.java # Aggregate data — main saga
│ ├── InvestmentAssetData.java # Aggregate data — asset universe
│ ├── InvestmentContentData.java # Aggregate data — content
│ ├── InvestmentDataValue.java # Common interface for data
│ ├── InvestmentArrangement.java
│ ├── ClientUser.java
│ ├── ModelPortfolio.java
│ ├── Allocation.java
│ ├── Asset.java
│ ├── AssetKey.java
│ ├── AssetPrice.java
│ ├── PortfolioRiskAssessment.java
│ └── RandomParam.java
└── test/java/com/backbase/stream/
├── configuration/
│ └── InvestmentClientConfigTest.java
└── investment/
├── saga/
│ ├── InvestmentSagaTest.java
│ ├── InvestmentAssetUniverseSagaTest.java
│ └── InvestmentContentSagaTest.java
└── service/
├── InvestmentClientServiceTest.java
├── InvestmentPortfolioServiceTest.java
├── InvestmentAssetUniverseServiceTest.java
├── InvestmentCurrencyServiceTest.java
├── InvestmentModelPortfolioServiceTest.java
├── InvestmentPortfolioAllocationServiceTest.java
├── InvestmentAssetPriceServiceTest.java
├── InvestmentIntradayAssetPriceServiceTest.java
├── InvestmentRiskAssessmentServiceTest.java
├── InvestmentRiskQuestionaryServiceTest.java
└── resttemplate/
├── InvestmentRestAssetUniverseServiceTest.java
├── InvestmentRestNewsContentServiceTest.java
└── InvestmentRestDocumentContentServiceTest.java
# Build investment module only (with upstream dependencies)
mvn clean install -pl stream-investment/investment-core -am
# Build without tests
mvn clean install -pl stream-investment/investment-core -am -DskipTest -Dmaven.test.skip=trueNote: The build uses
boat-maven-pluginto generate both reactive WebClient and synchronous RestTemplate API clients from the Investment Service OpenAPI spec (investment-service-api). Generated sources appear undertarget/generated-sources/— do not edit them manually.
Tests follow the standard Stream Services patterns:
-
JUnit 5 + Mockito + reactor-test (
@ExtendWith(MockitoExtension.class)) - DBS API clients are mocked (
@Mock ClientApi clientApi, etc.) - Reactive chains are verified with
StepVerifieror.block() - Configuration wiring is validated with
ApplicationContextRunner
| Test | What It Validates |
|---|---|
InvestmentSagaTest |
Full saga flow: client → products → portfolios → allocations |
InvestmentAssetUniverseSagaTest |
Asset universe flow: currencies → markets → assets → prices |
InvestmentContentSagaTest |
Content flow: tags → news → documents |
InvestmentClientConfigTest |
Spring context wiring for all Investment API beans |
InvestmentClientServiceTest |
Client upsert logic, deduplication, error handling |
InvestmentPortfolioServiceTest |
Portfolio product and portfolio CRUD |
InvestmentAssetUniverseServiceTest |
Market and asset upsert with image upload |
InvestmentPortfolioAllocationServiceTest |
Allocation generation from portfolio products |
| (and more...) | Each service has a corresponding unit test |
-
Three separate sagas — Asset universe, client/portfolio, and content concerns are fully decoupled. This allows running asset setup independently (e.g., on a schedule) without re-processing clients.
-
Async bulk price ingestion — Close prices use the
AsyncBulkGroupsApifor efficient bulk loading. The saga polls for completion before proceeding to dependent steps (intraday prices, allocations). -
Dual client strategy — Reactive
WebClientfor standard CRUD operations; synchronousRestTemplatefor multipart/form-data uploads that the generated WebClient wrappers cannot handle. -
Idempotent upsert everywhere — All operations use GET-then-PATCH/POST patterns, making the entire flow safe to retry without creating duplicates.
-
Concurrency control — Configurable concurrency limits per operation type prevent overwhelming the Investment Service with too many parallel requests (avoiding 503 errors).
-
Feature-flag granularity — Each saga can be individually enabled/disabled, and fine-grained settings control concurrency, default values, and image ingestion.
This saga is implemented in the loans-core module, creating loans in DBS's loan service. The task for the SAGA is created and triggered in Product Ingestion SAGA as the loan itself is a specific type of arrangement and the arrangement should be created before corresponding loan.