diff --git a/cspell.json b/cspell.json index eea51669780..dac48efdf3e 100644 --- a/cspell.json +++ b/cspell.json @@ -1122,7 +1122,9 @@ "reactnative", "realtime", "reCaptcha", + "recordcache", "RecyclerView", + "retryable", "redirect_to", "referrerpolicy", "refetches", @@ -1346,6 +1348,7 @@ "UIViewController", "unauth", "Unauth", + "uncategorized", "uncommenting", "unencrypted", "unioned", diff --git a/src/components/Layout/Layout.tsx b/src/components/Layout/Layout.tsx index 6cb2910ee9d..90f1d4b6010 100644 --- a/src/components/Layout/Layout.tsx +++ b/src/components/Layout/Layout.tsx @@ -374,7 +374,8 @@ export const Layout = ({ )} {(asPathWithNoHash.includes('/push-notifications/') || asPathWithNoHash.includes('/analytics/') || - asPathWithNoHash.includes('/in-app-messaging/')) && ( + asPathWithNoHash.includes('/in-app-messaging/')) && + !asPathWithNoHash.includes('/kinesis') && ( )} {asPathWithNoHash.includes('/interactions/') && ( diff --git a/src/directory/directory.mjs b/src/directory/directory.mjs index ffe67e4bb6d..317f4c105d9 100644 --- a/src/directory/directory.mjs +++ b/src/directory/directory.mjs @@ -474,6 +474,10 @@ export const directory = { path: 'src/pages/[platform]/build-a-backend/add-aws-services/analytics/set-up-analytics/index.mdx', section: 'backend' }, + { + path: 'src/pages/[platform]/build-a-backend/add-aws-services/analytics/kinesis/index.mdx', + section: 'backend' + }, { path: 'src/pages/[platform]/build-a-backend/add-aws-services/analytics/existing-resources/index.mdx', section: 'backend' @@ -800,6 +804,9 @@ export const directory = { { path: 'src/pages/[platform]/frontend/analytics/index.mdx', children: [ + { + path: 'src/pages/[platform]/frontend/analytics/kinesis/index.mdx' + }, { path: 'src/pages/[platform]/frontend/analytics/record-events/index.mdx' }, diff --git a/src/pages/[platform]/build-a-backend/add-aws-services/analytics/kinesis/index.mdx b/src/pages/[platform]/build-a-backend/add-aws-services/analytics/kinesis/index.mdx new file mode 100644 index 00000000000..d3908cc27cf --- /dev/null +++ b/src/pages/[platform]/build-a-backend/add-aws-services/analytics/kinesis/index.mdx @@ -0,0 +1,81 @@ +import { getCustomStaticPath } from '@/utils/getCustomStaticPath'; + +export const meta = { + title: 'Kinesis Data Streams', + description: 'Set up an Amazon Kinesis Data Stream and configure IAM permissions for the Amplify Kinesis Data Streams client.', + platforms: [ + 'android', + 'flutter', + 'swift' + ], +}; + +export const getStaticPaths = async () => { + return getCustomStaticPath(meta.platforms); +}; + +export function getStaticProps(context) { + return { + props: { + platform: context.params.platform, + meta + } + }; +} + +Use the [AWS Cloud Development Kit (AWS CDK)](https://docs.aws.amazon.com/cdk/latest/guide/home.html) to create an [Amazon Kinesis Data Stream](https://aws.amazon.com/kinesis/data-streams/) and grant your app the permissions it needs. For more on adding custom AWS resources to your Amplify backend, see [Custom resources](/[platform]/build-a-backend/add-aws-services/custom-resources/). + +## Set up a Kinesis stream + +```ts title="amplify/backend.ts" +import { defineBackend } from "@aws-amplify/backend"; +import { auth } from "./auth/resource"; +import { data } from "./data/resource"; +import { Policy, PolicyStatement } from "aws-cdk-lib/aws-iam"; +import { Stream } from "aws-cdk-lib/aws-kinesis"; +import { Stack } from "aws-cdk-lib/core"; + +const backend = defineBackend({ + auth, + data, +}); + +const kinesisStack = backend.createStack("kinesis-stack"); + +// Create a Kinesis stream +const kinesisStream = new Stream(kinesisStack, "KinesisStream", { + streamName: "myKinesisStream", + shardCount: 1, +}); + +// Grant PutRecords permission to authenticated users +const kinesisPolicy = new Policy(kinesisStack, "KinesisPolicy", { + statements: [ + new PolicyStatement({ + actions: ["kinesis:PutRecords"], + resources: [kinesisStream.streamArn], + }), + ], +}); + +backend.auth.resources.authenticatedUserIamRole.attachInlinePolicy(kinesisPolicy); +``` + +If you are not using the CDK, ensure your authenticated IAM role has the `kinesis:PutRecords` permission on your target stream: + +```json +{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Action": "kinesis:PutRecords", + "Resource": "arn:aws:kinesis:::stream/" + }] +} +``` + +For more information, see the [Amazon Kinesis Developer Documentation](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-iam.html). + +## Next steps + +Use the [Kinesis Data Streams client](/[platform]/frontend/analytics/kinesis/) to stream data from your app. diff --git a/src/pages/[platform]/frontend/analytics/kinesis/index.mdx b/src/pages/[platform]/frontend/analytics/kinesis/index.mdx new file mode 100644 index 00000000000..340d4d72ca5 --- /dev/null +++ b/src/pages/[platform]/frontend/analytics/kinesis/index.mdx @@ -0,0 +1,553 @@ +import { getCustomStaticPath } from '@/utils/getCustomStaticPath'; + +export const meta = { + title: 'Kinesis Data Streams client', + description: 'A standalone client for streaming data to Amazon Kinesis Data Streams with offline support, automatic batching, and configurable flushing.', + platforms: [ + 'swift', + 'android', + 'flutter' + ], +}; + +export const getStaticPaths = async () => { + return getCustomStaticPath(meta.platforms); +}; + +export function getStaticProps(context) { + return { + props: { + platform: context.params.platform, + meta + } + }; +} + +`AmplifyKinesisClient` is a standalone client for streaming data to [Amazon Kinesis Data Streams](https://aws.amazon.com/kinesis/data-streams/). It provides: + +- Local persistence for offline support +- Automatic retry for failed records +- Automatic batching (up to 500 records or 10 MB per request) +- Interval-based automatic flushing (default: every 30 seconds) +- Enable/disable toggle that silently drops new records while preserving cached ones + + + +This is a standalone client, separate from the Amplify Analytics category plugin. It communicates directly with the Kinesis Data Streams API using `PutRecords`. + + + + + +Before using this client, ensure your backend is configured with the required IAM permissions. See [Set up Kinesis Data Streams](/[platform]/build-a-backend/add-aws-services/analytics/kinesis/). + + + +## Getting started + +### Installation + + + +Add the dependency to your module's `build.gradle.kts`: + +```kotlin +dependencies { + implementation("com.amplifyframework:aws-kinesis:LATEST_VERSION") +} +``` + + + + + +Add `AmplifyKinesisClient` to your project using Swift Package Manager. In Xcode, go to **File > Add Package Dependencies** and enter the repository URL for the Amplify Swift SDK. + + + + + +Add the dependency to your `pubspec.yaml`: + +```yaml +dependencies: + amplify_kinesis: ^2.11.0 +``` + + + +### Initialize the client + + + +```kotlin +import com.amplifyframework.kinesis.AmplifyKinesisClient + +val kinesis = AmplifyKinesisClient( + context = applicationContext, + region = "us-east-1", + credentialsProvider = credentialsProvider +) +``` + + + + + +```swift +import AmplifyKinesisClient + +let kinesis = try AmplifyKinesisClient( + region: "us-east-1", + credentialsProvider: credentialsProvider +) +``` + + + + + +```dart +import 'package:amplify_kinesis/amplify_kinesis.dart'; + +final kinesis = await AmplifyKinesisClient.create( + region: 'us-east-1', + credentialsProvider: credentialsProvider, +); +``` + + + +### Configuration options + +You can customize the client behavior by passing an options object: + + + +| Option | Default | Description | +|---|---|---| +| `cacheMaxBytes` | 5 MB | Maximum size of the local record cache in bytes. | +| `maxRetries` | 5 | Maximum retry attempts per record before it is discarded. | +| `flushStrategy` | `FlushStrategy.Interval(30.seconds)` | Automatic flush interval. Use `FlushStrategy.None` for manual-only flushing. | +| `configureClient` | `null` | Escape hatch to customize the underlying AWS SDK `KinesisClient`. | + + + + + +| Option | Default | Description | +|---|---|---| +| `cacheMaxBytes` | 5 MB | Maximum size of the local record cache in bytes. | +| `maxRetries` | 5 | Maximum retry attempts per record before it is discarded. | +| `flushStrategy` | `.interval(30)` | Automatic flush interval in seconds. Use `.none` for manual-only flushing. | +| `configureClient` | `nil` | Closure to customize the underlying `KinesisClientConfiguration`. | + + + + + +| Option | Default | Description | +|---|---|---| +| `cacheMaxBytes` | 5 MB | Maximum size of the local record cache in bytes. | +| `maxRetries` | 5 | Maximum retry attempts per record before it is discarded. | +| `flushStrategy` | `FlushInterval(interval: Duration(seconds: 30))` | Automatic flush interval. Use `FlushNone()` for manual-only flushing. | + + + + + +```kotlin +import com.amplifyframework.kinesis.AmplifyKinesisClient +import com.amplifyframework.kinesis.AmplifyKinesisClientOptions +import com.amplifyframework.recordcache.FlushStrategy +import kotlin.time.Duration.Companion.seconds + +val kinesis = AmplifyKinesisClient( + context = applicationContext, + region = "us-east-1", + credentialsProvider = credentialsProvider, + options = AmplifyKinesisClientOptions { + cacheMaxBytes = 10L * 1024 * 1024 // 10 MB + maxRetries = 3 + flushStrategy = FlushStrategy.Interval(60.seconds) + configureClient { + retryStrategy { maxAttempts = 10 } + } + } +) +``` + +To disable automatic flushing: + +```kotlin +options = AmplifyKinesisClientOptions { + flushStrategy = FlushStrategy.None +} +``` + + + + + +```swift +let kinesis = try AmplifyKinesisClient( + region: "us-east-1", + credentialsProvider: credentialsProvider, + options: .init( + cacheMaxBytes: 10 * 1_024 * 1_024, // 10 MB + maxRetries: 3, + flushStrategy: .interval(60), + configureClient: { config in + // Customize the underlying KinesisClientConfiguration + } + ) +) +``` + +To disable automatic flushing: + +```swift +options: .init(flushStrategy: .none) +``` + + + + + +```dart +final kinesis = await AmplifyKinesisClient.create( + region: 'us-east-1', + credentialsProvider: credentialsProvider, + options: AmplifyKinesisClientOptions( + cacheMaxBytes: 10 * 1024 * 1024, // 10 MB + maxRetries: 3, + flushStrategy: FlushInterval(interval: Duration(seconds: 60)), + ), +); +``` + +To disable automatic flushing: + +```dart +options: AmplifyKinesisClientOptions( + flushStrategy: FlushNone(), +), +``` + + + +## Usage + +### Record data + +Use `record()` to persist data to the local cache. Records are sent to Kinesis during the next flush cycle (automatic or manual). + + + +```kotlin +val result = kinesis.record( + data = "Hello Kinesis".toByteArray(), + partitionKey = "partition-1", + streamName = "my-stream" +) +when (result) { + is Result.Success -> { /* recorded successfully */ } + is Result.Failure -> { /* handle error */ } +} +``` + + + + + +```swift +let result = try await kinesis.record( + data: "Hello Kinesis".data(using: .utf8)!, + partitionKey: "partition-1", + streamName: "my-stream" +) +``` + + + + + +```dart +final result = await kinesis.record( + data: Uint8List.fromList(utf8.encode('Hello Kinesis')), + partitionKey: 'partition-1', + streamName: 'my-stream', +); +switch (result) { + case Ok(): print('Recorded'); + case Error(:final error): print('Error: $error'); +} +``` + + + +Records submitted while the client is disabled are silently dropped. + +### Flush records + +The client automatically flushes cached records at the configured interval (default: 30 seconds). You can also trigger a manual flush: + + + +```kotlin +when (val result = kinesis.flush()) { + is Result.Success -> println("Flushed ${result.data.recordsFlushed} records") + is Result.Failure -> println("Flush error: ${result.error}") +} +``` + + + + + +```swift +let flushResult = try await kinesis.flush() +print("Flushed \(flushResult.recordsFlushed) records") +``` + + + + + +```dart +switch (await kinesis.flush()) { + case Ok(:final value): + print('Flushed ${value.recordsFlushed} records'); + case Error(:final error): + print('Flush failed: $error'); +} +``` + + + +Each flush sends at most one batch per stream (up to 500 records or 10 MB). Remaining records are picked up in subsequent flush cycles. If a flush is already in progress, the call returns immediately with `flushInProgress: true`. + +Manual flushes work even when the client is disabled, allowing you to drain cached records without re-enabling collection. + +### Clear cache + +Delete all cached records from local storage: + + + +```kotlin +kinesis.clearCache() +``` + + + + + +```swift +let cleared = try await kinesis.clearCache() +``` + + + + + +```dart +await kinesis.clearCache(); +``` + + + +### Enable and disable + +You can toggle record collection and automatic flushing at runtime. When disabled, new records are silently dropped but already-cached records remain in storage. + + + +```kotlin +kinesis.disable() +// Records are dropped, auto-flush paused + +kinesis.enable() +// Collection and auto-flush resume +``` + + + + + +```swift +await kinesis.disable() +// Records are dropped, auto-flush paused + +await kinesis.enable() +// Collection and auto-flush resume +``` + + + + + +```dart +kinesis.disable(); +// Records are dropped, auto-flush paused + +kinesis.enable(); +// Collection and auto-flush resume +``` + + + + + +### Close the client + +When you're done with the client, release its resources. The client cannot be reused after closing. + +```dart +await kinesis.close(); +``` + + + +## Advanced + +### Escape hatch + +Access the underlying AWS SDK `KinesisClient` for operations not covered by this client's API: + + + +```kotlin +val sdkClient = kinesis.kinesisClient +// Use sdkClient for direct Kinesis API calls +``` + + + + + +```swift +let sdkClient = kinesis.getKinesisClient() +// Use sdkClient for direct Kinesis API calls +``` + + + + + +```dart +final sdkClient = kinesis.kinesisClient; +// Use sdkClient for direct Kinesis API calls +``` + + + +### Error handling + +All operations surface errors through a sealed exception hierarchy: + + + +| Error type | Description | +|---|---| +| `AmplifyKinesisValidationException` | Record input validation failed (oversized record, invalid partition key). | +| `AmplifyKinesisLimitExceededException` | Local cache is full. Call `flush()` or `clearCache()` to free space. | +| `AmplifyKinesisStorageException` | Local database error. | +| `AmplifyKinesisUnknownException` | Unexpected or uncategorized error. | + +Operations return `Result`: + +```kotlin +when (val result = kinesis.record(...)) { + is Result.Success -> { /* success */ } + is Result.Failure -> when (result.error) { + is AmplifyKinesisValidationException -> { /* invalid input */ } + is AmplifyKinesisLimitExceededException -> { /* cache full */ } + is AmplifyKinesisStorageException -> { /* database error */ } + is AmplifyKinesisUnknownException -> { /* unexpected error */ } + } +} +``` + + + + + +| Error type | Description | +|---|---| +| `KinesisError.validation` | Record input validation failed (oversized record, invalid partition key). | +| `KinesisError.cacheLimitExceeded` | Local cache is full. Call `flush()` or `clearCache()` to free space. | +| `KinesisError.cache` | Local database error. | +| `KinesisError.unknown` | Unexpected or uncategorized error. | + +Operations throw `KinesisError`: + +```swift +do { + try await kinesis.record( + data: payload, + partitionKey: "key", + streamName: "stream" + ) +} catch let error as KinesisError { + switch error { + case .validation(let desc, _, _): + print("Validation error: \(desc)") + case .cacheLimitExceeded: + print("Cache full") + case .cache(let desc, _, _): + print("Storage error: \(desc)") + case .unknown(let desc, _, _): + print("Unknown error: \(desc)") + } +} +``` + + + + + +| Error type | Description | +|---|---| +| `KinesisValidationException` | Record input validation failed (oversized record, invalid partition key). | +| `KinesisLimitExceededException` | Local cache is full. Call `flush()` or `clearCache()` to free space. | +| `KinesisStorageException` | Local database error. | +| `KinesisUnknownException` | Unexpected or uncategorized error. | +| `ClientClosedException` | The client was closed and cannot be used. | + +Operations return `Result` with `AmplifyKinesisException` subtypes: + +```dart +switch (await kinesis.record(...)) { + case Ok(): break; + case Error(:final error): + switch (error) { + case KinesisValidationException(): // invalid input + case KinesisLimitExceededException(): // cache full + case KinesisStorageException(): // database error + case KinesisUnknownException(): // unexpected error + case ClientClosedException(): // client was closed + } +} +``` + + + +### Retry behavior + +- All `PutRecords` error codes (`ProvisionedThroughputExceededException`, `InternalFailure`) are treated as retryable. +- Each failed record's retry count is incremented after each attempt. +- Records exceeding `maxRetries` (default: 5) are permanently deleted from the cache. +- SDK-level Kinesis errors are logged and skipped per-stream, so other streams can still flush. +- Non-SDK errors (network failures, storage errors) abort the flush entirely. + +### Kinesis service limits + +The client enforces these limits before sending to the service: + +| Limit | Value | +|---|---| +| Max records per `PutRecords` request | 500 | +| Max single record size | 10 MB | +| Max total payload per `PutRecords` request | 10 MB | +| Max partition key length | 256 characters | diff --git a/src/utils/__tests__/getPageSection.test.ts b/src/utils/__tests__/getPageSection.test.ts index 7e3a32176a4..7770f805fd9 100644 --- a/src/utils/__tests__/getPageSection.test.ts +++ b/src/utils/__tests__/getPageSection.test.ts @@ -41,9 +41,7 @@ describe('getPageSection', () => { }); it('returns frontend for SSR pages at new path', () => { - const result = getPageSection( - '/[platform]/frontend/server-side-rendering' - ); + const result = getPageSection('/[platform]/frontend/server-side-rendering'); expect(result.section).toBe('frontend'); }); @@ -54,6 +52,11 @@ describe('getPageSection', () => { expect(result.featureRoute).toBe('/[platform]/frontend/auth'); }); + it('returns backend featureRoute for frontend auth pages', () => { + const result = getPageSection('/[platform]/frontend/auth/sign-in'); + expect(result.featureRoute).toBe('/[platform]/build-a-backend/auth'); + }); + it('returns frontend featureRoute for deeply nested backend pages', () => { const result = getPageSection( '/[platform]/build-a-backend/data/data-modeling/add-fields' @@ -70,4 +73,67 @@ describe('getPageSection', () => { const result = getPageSection('/gen1/[platform]/build-a-backend/auth'); expect(result.section).toBeUndefined(); }); + + it('returns backend add-aws-services featureRoute for frontend analytics pages', () => { + const result = getPageSection( + '/[platform]/frontend/analytics/record-events' + ); + expect(result.featureRoute).toBe( + '/[platform]/build-a-backend/add-aws-services/analytics' + ); + }); + + it('returns deep backend featureRoute for frontend kinesis page', () => { + const result = getPageSection('/[platform]/frontend/analytics/kinesis'); + expect(result.featureRoute).toBe( + '/[platform]/build-a-backend/add-aws-services/analytics/kinesis' + ); + }); + + it('returns frontend featureRoute for backend add-aws-services analytics page', () => { + const result = getPageSection( + '/[platform]/build-a-backend/add-aws-services/analytics/set-up-analytics' + ); + expect(result.featureRoute).toBe('/[platform]/frontend/analytics'); + }); + + it('returns backend featureRoute for frontend geo page', () => { + const result = getPageSection('/[platform]/frontend/geo/maps'); + expect(result.featureRoute).toBe( + '/[platform]/build-a-backend/add-aws-services/geo' + ); + }); + + it('returns frontend featureRoute for backend geo page', () => { + const result = getPageSection( + '/[platform]/build-a-backend/add-aws-services/geo/set-up-geo' + ); + expect(result.featureRoute).toBe('/[platform]/frontend/geo'); + }); + + it('returns frontend featureRoute for backend storage page', () => { + const result = getPageSection( + '/[platform]/build-a-backend/storage/set-up-storage' + ); + expect(result.featureRoute).toBe('/[platform]/frontend/storage'); + }); + + it('returns backend featureRoute for frontend storage page', () => { + const result = getPageSection('/[platform]/frontend/storage/upload-files'); + expect(result.featureRoute).toBe('/[platform]/build-a-backend/storage'); + }); + + it('returns undefined featureRoute for pages with no cross-section match', () => { + const result = getPageSection( + '/[platform]/build-a-backend/functions/set-up-function' + ); + expect(result.featureRoute).toBeUndefined(); + }); + + it('returns undefined featureRoute for Gen1 pages', () => { + const result = getPageSection( + '/gen1/[platform]/build-a-backend/auth/set-up-auth' + ); + expect(result.featureRoute).toBeUndefined(); + }); }); diff --git a/src/utils/getPageSection.ts b/src/utils/getPageSection.ts index bbe2dc49916..3e8c8fe3de8 100644 --- a/src/utils/getPageSection.ts +++ b/src/utils/getPageSection.ts @@ -4,8 +4,7 @@ import { SectionKey } from '@/data/sections'; /** * Walk the directory tree from a page's pathname upward to find * the nearest ancestor with a non-'both' section tag. - * Also finds the feature category route (e.g., /[platform]/build-a-backend/auth) - * for smart CrossLink targeting. + * Also finds the best matching backend/frontend route for CrossLink targeting. */ export function getPageSection(pathname: string): { section: SectionKey | undefined; @@ -24,34 +23,58 @@ export function getPageSection(pathname: string): { } } - // Find feature category for CrossLink targeting. - // For backend pages, link to the corresponding frontend category. - // For frontend pages, link to the corresponding backend category. - let featureRoute: string | undefined; - const backendFeature = pathname.match( - /\/\[platform\]\/build-a-backend\/([^/]+)/ - ); - const frontendFeature = pathname.match(/\/\[platform\]\/frontend\/([^/]+)/); - - if (backendFeature) { - // Backend page → link to frontend equivalent - const feature = backendFeature[1]; - const frontendNode = findDirectoryNode( - `/[platform]/frontend/${feature}` - ); - if (frontendNode) { - featureRoute = `/[platform]/frontend/${feature}`; + // CrossLink: find the corresponding page in the other section. + // Try progressively shorter sub-paths for the best match. + const featureRoute = findCrossLink(pathname); + + return { section, featureRoute }; +} + +const BACKEND_ROOTS = [ + '/[platform]/build-a-backend/add-aws-services/', + '/[platform]/build-a-backend/' +]; +const FRONTEND_ROOT = '/[platform]/frontend/'; + +/** + * Given a pathname, find the best matching route in the opposite section. + * Tries deepest sub-path first, then walks up to feature-level. + */ +function findCrossLink(pathname: string): string | undefined { + // Backend → Frontend + for (const root of BACKEND_ROOTS) { + if (pathname.startsWith(root)) { + const relative = pathname.slice(root.length); + return findBestMatch(relative, FRONTEND_ROOT); } - } else if (frontendFeature) { - // Frontend page → link to backend equivalent - const feature = frontendFeature[1]; - const backendNode = findDirectoryNode( - `/[platform]/build-a-backend/${feature}` - ); - if (backendNode) { - featureRoute = `/[platform]/build-a-backend/${feature}`; + } + + // Frontend → Backend (try sub-paths from deepest to shallowest) + if (pathname.startsWith(FRONTEND_ROOT)) { + const relative = pathname.slice(FRONTEND_ROOT.length); + for (const root of BACKEND_ROOTS) { + const match = findBestMatch(relative, root); + if (match) return match; } } - return { section, featureRoute }; + return undefined; +} + +/** + * Try progressively shorter sub-paths of `relative` under `targetRoot`. + * e.g. relative="analytics/kinesis" tries "analytics/kinesis" then "analytics". + */ +function findBestMatch( + relative: string, + targetRoot: string +): string | undefined { + const parts = relative.split('/').filter(Boolean); + for (let i = parts.length; i > 0; i--) { + const candidate = targetRoot + parts.slice(0, i).join('/'); + if (findDirectoryNode(candidate)) { + return candidate; + } + } + return undefined; }