diff --git a/.infra/common.ts b/.infra/common.ts index 343349e589..04a43229b0 100644 --- a/.infra/common.ts +++ b/.infra/common.ts @@ -151,8 +151,8 @@ export const workers: Worker[] = [ subscription: 'api.new-notification-push', }, { - topic: 'api.v1.post-highlighted', - subscription: 'api.new-highlight-real-time', + topic: 'api.v1.highlight-created', + subscription: 'api.new-highlight-real-time-v2', }, { topic: 'api.v1.source-privacy-updated', @@ -312,8 +312,12 @@ export const workers: Worker[] = [ subscription: 'api.post-added-user-notification', }, { - topic: 'api.v1.post-highlighted', - subscription: 'api.major-highlight-tweet', + topic: 'api.v1.post-visible', + subscription: 'api.agentic-digest-tweet', + }, + { + topic: 'api.v1.highlight-created', + subscription: 'api.major-highlight-tweet-v2', }, { topic: 'api.v1.source-post-moderation-submitted', @@ -452,8 +456,8 @@ export const workers: Worker[] = [ subscription: 'api.recruiter-new-candidate-notification', }, { - topic: 'api.v1.post-highlighted', - subscription: 'api.major-headline-added-notification', + topic: 'api.v1.highlight-created', + subscription: 'api.major-headline-added-notification-v2', }, { topic: 'gondul.v1.candidate-application-scored', @@ -523,8 +527,8 @@ export const workers: Worker[] = [ subscription: 'api.generate-channel-digest', }, { - topic: 'api.v1.generate-channel-highlight', - subscription: 'api.generate-channel-highlight', + topic: 'api.v1.generate-highlights', + subscription: 'api.generate-highlights-v2', }, { topic: 'api.v1.user-deletion-requested', diff --git a/__tests__/cron/channelHighlights.ts b/__tests__/cron/channelHighlights.ts index 7282145d71..d5650bc511 100644 --- a/__tests__/cron/channelHighlights.ts +++ b/__tests__/cron/channelHighlights.ts @@ -1,6 +1,5 @@ import type { DataSource } from 'typeorm'; import createOrGetConnection from '../../src/db'; -import { ChannelHighlightDefinition } from '../../src/entity/ChannelHighlightDefinition'; import * as typedPubsub from '../../src/common/typedPubsub'; import channelHighlights from '../../src/cron/channelHighlights'; import { crons } from '../../src/cron/index'; @@ -14,7 +13,6 @@ beforeAll(async () => { describe('channelHighlights cron', () => { afterEach(async () => { jest.restoreAllMocks(); - await con.getRepository(ChannelHighlightDefinition).clear(); }); it('should be registered', () => { @@ -25,32 +23,11 @@ describe('channelHighlights cron', () => { expect(registeredCron).toBeDefined(); }); - it('should enqueue active highlight definitions', async () => { + it('should enqueue a global highlight generation run', async () => { const triggerTypedEventSpy = jest .spyOn(typedPubsub, 'triggerTypedEvent') .mockResolvedValue(); - await con.getRepository(ChannelHighlightDefinition).save([ - { - channel: 'backend', - mode: 'shadow', - candidateHorizonHours: 72, - maxItems: 10, - }, - { - channel: 'vibes', - mode: 'shadow', - candidateHorizonHours: 72, - maxItems: 10, - }, - { - channel: 'disabled', - mode: 'disabled', - candidateHorizonHours: 72, - maxItems: 10, - }, - ]); - const startedAt = Date.now(); await channelHighlights.handler(con, {} as never, {} as never); const completedAt = Date.now(); @@ -58,17 +35,8 @@ describe('channelHighlights cron', () => { expect(triggerTypedEventSpy.mock.calls).toEqual([ [ {}, - 'api.v1.generate-channel-highlight', + 'api.v1.generate-highlights', { - channel: 'backend', - scheduledAt: expect.any(String), - }, - ], - [ - {}, - 'api.v1.generate-channel-highlight', - { - channel: 'vibes', scheduledAt: expect.any(String), }, ], @@ -79,8 +47,5 @@ describe('channelHighlights cron', () => { ); expect(scheduledAt).toBeGreaterThanOrEqual(startedAt); expect(scheduledAt).toBeLessThanOrEqual(completedAt); - expect(triggerTypedEventSpy.mock.calls[0][2].scheduledAt).toBe( - triggerTypedEventSpy.mock.calls[1][2].scheduledAt, - ); }); }); diff --git a/__tests__/cron/cleanChannelHighlights.ts b/__tests__/cron/cleanChannelHighlights.ts index 864adfae54..9b3e17bfd3 100644 --- a/__tests__/cron/cleanChannelHighlights.ts +++ b/__tests__/cron/cleanChannelHighlights.ts @@ -5,6 +5,7 @@ import { expectSuccessfulCron } from '../helpers'; import { crons } from '../../src/cron/index'; import { ChannelHighlightRun } from '../../src/entity/ChannelHighlightRun'; import { PostHighlight } from '../../src/entity/PostHighlight'; +import { PostHighlightChannel } from '../../src/entity/PostHighlightChannel'; import { ArticlePost, Source } from '../../src/entity'; import { PostType } from '../../src/entity/posts/Post'; import { createSource } from '../fixture/source'; @@ -93,7 +94,8 @@ describe('cleanChannelHighlights cron', () => { afterEach(async () => { await con.getRepository(ChannelHighlightRun).clear(); - await con.getRepository(PostHighlight).clear(); + await con.getRepository(PostHighlightChannel).clear(); + await con.createQueryBuilder().delete().from(PostHighlight).execute(); await con .createQueryBuilder() .delete() @@ -114,7 +116,7 @@ describe('cleanChannelHighlights cron', () => { }); it('should delete retired highlights older than 30 days and expired runs', async () => { - await con.getRepository(PostHighlight).save([ + const highlights = await con.getRepository(PostHighlight).save([ { channel: 'vibes', postId: 'active-highlight', @@ -137,6 +139,26 @@ describe('cleanChannelHighlights cron', () => { retiredAt: sub(new Date(), { days: 1 }), }, ]); + await con.getRepository(PostHighlightChannel).save([ + { + highlightId: highlights[0].id, + channel: 'vibes', + placedAt: highlights[0].highlightedAt, + retiredAt: null, + }, + { + highlightId: highlights[1].id, + channel: 'vibes', + placedAt: highlights[1].highlightedAt, + retiredAt: new Date('2026-01-02T10:00:00.000Z'), + }, + { + highlightId: highlights[2].id, + channel: 'vibes', + placedAt: highlights[2].highlightedAt, + retiredAt: sub(new Date(), { days: 1 }), + }, + ]); await con.getRepository(ChannelHighlightRun).save([ { channel: 'vibes', @@ -164,14 +186,19 @@ describe('cleanChannelHighlights cron', () => { await expectSuccessfulCron(cleanChannelHighlights); - const highlights = await con.getRepository(PostHighlight).find({ + const remainingHighlights = await con.getRepository(PostHighlight).find({ order: { postId: 'ASC' }, }); + const remainingPlacements = await con + .getRepository(PostHighlightChannel) + .find({ + order: { highlightId: 'ASC' }, + }); const runs = await con.getRepository(ChannelHighlightRun).find({ order: { scheduledAt: 'ASC' }, }); - expect(highlights).toEqual([ + expect(remainingHighlights).toEqual([ expect.objectContaining({ postId: 'active-highlight', }), @@ -179,6 +206,7 @@ describe('cleanChannelHighlights cron', () => { postId: 'recently-retired-highlight', }), ]); + expect(remainingPlacements).toHaveLength(2); expect(runs).toHaveLength(1); expect(runs[0].completedAt).not.toBeNull(); }); diff --git a/__tests__/highlights.ts b/__tests__/highlights.ts index 336e461644..d452ac2b84 100644 --- a/__tests__/highlights.ts +++ b/__tests__/highlights.ts @@ -16,6 +16,7 @@ import { PostHighlight, PostHighlightSignificance, } from '../src/entity/PostHighlight'; +import { PostHighlightChannel } from '../src/entity/PostHighlightChannel'; import { PostType } from '../src/entity/posts/Post'; import { sourcesFixture } from './fixture/source'; @@ -87,11 +88,42 @@ const createTestPosts = async () => { ]); }; +const saveHighlights = async ( + items: Array< + Partial & { + postId: string; + channel: string; + highlightedAt: Date; + headline: string; + channels?: string[]; + } + >, +) => { + const highlights = await con.getRepository(PostHighlight).save(items); + + await con.getRepository(PostHighlightChannel).save( + highlights.flatMap((highlight, index) => + (items[index].channels || [items[index].channel]).map((channel) => ({ + highlightId: highlight.id, + channel, + placedAt: highlight.highlightedAt, + retiredAt: + items[index].retiredAt instanceof Date + ? items[index].retiredAt + : null, + })), + ), + ); + + return highlights; +}; + beforeEach(async () => { jest.resetAllMocks(); await con.getRepository(ChannelDigest).clear(); await con.getRepository(ChannelHighlightDefinition).clear(); - await con.getRepository(PostHighlight).clear(); + await con.getRepository(PostHighlightChannel).clear(); + await con.createQueryBuilder().delete().from(PostHighlight).execute(); await con.getRepository(ArticlePost).delete(['h1', 'h2', 'h3', 'h4']); await con .getRepository(Source) @@ -233,7 +265,7 @@ describe('query postHighlights', () => { it('should return highlights ordered by highlightedAt desc', async () => { await createTestPosts(); - await con.getRepository(PostHighlight).save([ + await saveHighlights([ { postId: 'h2', channel: 'happening-now', @@ -277,7 +309,7 @@ describe('query postHighlights', () => { it('should filter by channel', async () => { await createTestPosts(); - await con.getRepository(PostHighlight).save([ + await saveHighlights([ { postId: 'h1', channel: 'happening-now', @@ -307,7 +339,7 @@ describe('query postHighlights', () => { it('should hide retired highlights', async () => { await createTestPosts(); - await con.getRepository(PostHighlight).save([ + await saveHighlights([ { postId: 'h1', channel: 'happening-now', @@ -339,16 +371,9 @@ describe('query postHighlights', () => { }); describe('query majorHeadlines', () => { - it('should return only breaking and major headlines deduplicated by postId', async () => { + it('should return only live breaking and major canonical highlights', async () => { await createTestPosts(); await con.getRepository(PostHighlight).save([ - { - postId: 'h1', - channel: 'agentic', - highlightedAt: new Date('2026-03-19T10:30:00.000Z'), - headline: 'Major agentic headline', - significance: PostHighlightSignificance.Major, - }, { postId: 'h1', channel: 'vibes', diff --git a/__tests__/sitemaps.ts b/__tests__/sitemaps.ts index 0132abe492..ae2cee3fb1 100644 --- a/__tests__/sitemaps.ts +++ b/__tests__/sitemaps.ts @@ -32,6 +32,7 @@ import { keywordsFixture } from './fixture/keywords'; import { ONE_DAY_IN_SECONDS } from '../src/common/constants'; import { ChannelHighlightDefinition } from '../src/entity/ChannelHighlightDefinition'; import { PostHighlight } from '../src/entity/PostHighlight'; +import { PostHighlightChannel } from '../src/entity/PostHighlightChannel'; let app: FastifyInstance; let con: DataSource; const previousSitemapLimit = process.env.SITEMAP_LIMIT; @@ -530,7 +531,7 @@ describe('GET /sitemaps/highlights.xml', () => { order: 0, }, ]); - await con.getRepository(PostHighlight).save([ + const highlights = await con.getRepository(PostHighlight).save([ { postId: 'p1', channel: 'career', @@ -543,17 +544,27 @@ describe('GET /sitemaps/highlights.xml', () => { highlightedAt: new Date('2026-04-12T09:00:00.000Z'), headline: 'Career latest', }, + ]); + await con.getRepository(PostHighlightChannel).save([ { - postId: 'p1', + highlightId: highlights[0].id, + channel: 'career', + placedAt: new Date('2026-04-10T10:00:00.000Z'), + }, + { + highlightId: highlights[0].id, channel: 'backend', - highlightedAt: new Date('2026-04-09T08:00:00.000Z'), - headline: 'Backend live', + placedAt: new Date('2026-04-09T08:00:00.000Z'), }, { - postId: 'p4', + highlightId: highlights[1].id, + channel: 'career', + placedAt: new Date('2026-04-12T09:00:00.000Z'), + }, + { + highlightId: highlights[1].id, channel: 'backend', - highlightedAt: new Date('2026-04-13T08:00:00.000Z'), - headline: 'Backend retired', + placedAt: new Date('2026-04-13T08:00:00.000Z'), retiredAt: new Date('2026-04-13T08:30:00.000Z'), }, ]); diff --git a/__tests__/workers/cdc/primary.ts b/__tests__/workers/cdc/primary.ts index 801d51a409..b54019490b 100644 --- a/__tests__/workers/cdc/primary.ts +++ b/__tests__/workers/cdc/primary.ts @@ -3,7 +3,6 @@ import { CandidateStatus, OpportunityState, OpportunityType, - PostHighlightedMessage, } from '@dailydotdev/schema'; import { Achievement, @@ -3226,7 +3225,6 @@ describe('post highlight', () => { highlightedAt: 1_770_000_000_000_000, headline: 'JavaScript in 2026', significance: PostHighlightSignificance.Major, - reason: 'Fast ecosystem update', retiredAt: null, createdAt: 1_770_000_000_000_000, updatedAt: 1_770_000_000_000_000, @@ -3236,7 +3234,7 @@ describe('post highlight', () => { jest.clearAllMocks(); }); - it('should publish highlighted event on create', async () => { + it('should not publish highlight events from CDC on create', async () => { await expectSuccessfulBackground( worker, mockChangeMessage({ @@ -3247,18 +3245,7 @@ describe('post highlight', () => { }), ); - expectTypedEvent( - 'api.v1.post-highlighted', - new PostHighlightedMessage({ - highlightId: base.id, - channel: base.channel, - postId: base.postId, - headline: base.headline, - significance: base.significance, - reason: base.reason ?? undefined, - highlightedAt: base.highlightedAt, - }), - ); + expect(triggerTypedEvent).not.toHaveBeenCalled(); }); it('should not publish highlighted event on update', async () => { diff --git a/__tests__/workers/generateChannelHighlight.ts b/__tests__/workers/generateChannelHighlight.ts index 03f2942e9b..715fb4a4fd 100644 --- a/__tests__/workers/generateChannelHighlight.ts +++ b/__tests__/workers/generateChannelHighlight.ts @@ -1,28 +1,19 @@ import { IsNull, type DataSource } from 'typeorm'; import createOrGetConnection from '../../src/db'; -import { ChannelDigest } from '../../src/entity/ChannelDigest'; import { ChannelHighlightDefinition } from '../../src/entity/ChannelHighlightDefinition'; -import { ChannelHighlightRun } from '../../src/entity/ChannelHighlightRun'; -import { AGENTS_DIGEST_SOURCE, UNKNOWN_SOURCE } from '../../src/entity/Source'; +import { AGENTS_DIGEST_SOURCE } from '../../src/entity/Source'; import { PostHighlight, PostHighlightSignificance, } from '../../src/entity/PostHighlight'; -import { - ArticlePost, - CollectionPost, - SharePost, - Source, -} from '../../src/entity'; -import { - PostRelation, - PostRelationType, -} from '../../src/entity/posts/PostRelation'; +import { PostHighlightChannel } from '../../src/entity/PostHighlightChannel'; +import { ArticlePost, Source } from '../../src/entity'; import { PostType } from '../../src/entity/posts/Post'; import worker from '../../src/workers/generateChannelHighlight'; import { typedWorkers } from '../../src/workers/index'; import { deleteKeysByPattern } from '../../src/redis'; import * as evaluator from '../../src/common/channelHighlight/evaluate'; +import * as typedPubsub from '../../src/common/typedPubsub'; import { expectSuccessfulTypedBackground } from '../helpers'; import { createSource } from '../fixture/source'; @@ -36,20 +27,12 @@ const saveArticle = async ({ id, title, createdAt, - statsUpdatedAt = createdAt, - metadataChangedAt = createdAt, - channel = 'vibes', - sourceId = 'content-source', - contentCuration = [] as string[], + channels, }: { id: string; title: string; createdAt: Date; - statsUpdatedAt?: Date; - metadataChangedAt?: Date; - channel?: string; - sourceId?: string; - contentCuration?: string[]; + channels: string[]; }) => con.getRepository(ArticlePost).save({ id, @@ -58,42 +41,7 @@ const saveArticle = async ({ url: `https://example.com/${id}`, canonicalUrl: `https://example.com/${id}`, score: 0, - sourceId, - visible: true, - deleted: false, - banned: false, - showOnFeed: true, - createdAt, - metadataChangedAt, - statsUpdatedAt, - type: PostType.Article, - contentCuration, - contentMeta: { - channels: [channel], - }, - }); - -const saveCollection = async ({ - id, - title, - createdAt, - channel = 'vibes', - sourceId = 'content-source', -}: { - id: string; - title: string; - createdAt: Date; - channel?: string; - sourceId?: string; -}) => - con.getRepository(CollectionPost).save({ - id, - shortId: id, - title, - url: `https://example.com/${id}`, - canonicalUrl: `https://example.com/${id}`, - score: 0, - sourceId, + sourceId: 'content-source', visible: true, deleted: false, banned: false, @@ -101,49 +49,15 @@ const saveCollection = async ({ createdAt, metadataChangedAt: createdAt, statsUpdatedAt: createdAt, - type: PostType.Collection, + type: PostType.Article, contentMeta: { - channels: [channel], + channels, }, }); -const saveShare = async ({ - id, - sharedPostId, - createdAt, - sourceId = 'content-source', - title = 'Shared story', - upvotes = 0, - private: isPrivate = false, -}: { - id: string; - sharedPostId: string; - createdAt: Date; - sourceId?: string; - title?: string; - upvotes?: number; - private?: boolean; -}) => - con.getRepository(SharePost).save({ - id, - shortId: id, - title, - sourceId, - sharedPostId, - createdAt, - metadataChangedAt: createdAt, - statsUpdatedAt: createdAt, - visible: true, - deleted: false, - banned: false, - private: isPrivate, - showOnFeed: true, - upvotes, - type: PostType.Share, - }); - describe('generateChannelHighlight worker', () => { beforeEach(async () => { + jest.restoreAllMocks(); await con .getRepository(Source) .save([ @@ -153,32 +67,29 @@ describe('generateChannelHighlight worker', () => { 'https://daily.dev/content.png', ), createSource( - 'secondary-source', - 'Secondary', - 'https://daily.dev/secondary.png', + AGENTS_DIGEST_SOURCE, + 'Agents Digest', + 'https://daily.dev/agents.png', ), ]); }); afterEach(async () => { - jest.restoreAllMocks(); - await deleteKeysByPattern('channel-highlight:*'); - await con.getRepository(ChannelHighlightRun).clear(); + await deleteKeysByPattern('highlights:*'); + await con.getRepository(PostHighlightChannel).clear(); + await con.createQueryBuilder().delete().from(PostHighlight).execute(); await con.getRepository(ChannelHighlightDefinition).clear(); - await con.getRepository(ChannelDigest).clear(); - await con.getRepository(PostHighlight).clear(); - await con.getRepository(PostRelation).clear(); await con .createQueryBuilder() .delete() .from('post') .where('"sourceId" IN (:...sourceIds)', { - sourceIds: ['content-source', 'secondary-source', AGENTS_DIGEST_SOURCE], + sourceIds: ['content-source', AGENTS_DIGEST_SOURCE], }) .execute(); await con .getRepository(Source) - .delete(['content-source', 'secondary-source', AGENTS_DIGEST_SOURCE]); + .delete(['content-source', AGENTS_DIGEST_SOURCE]); }); it('should be registered', () => { @@ -189,825 +100,216 @@ describe('generateChannelHighlight worker', () => { expect(registeredWorker).toBeDefined(); }); - it('should keep live highlights unchanged in shadow mode and store a comparison run', async () => { - const now = new Date('2026-03-03T10:00:00.000Z'); - await con.getRepository(ChannelHighlightDefinition).save({ - channel: 'vibes', - mode: 'shadow', - targetAudience: 'Developers following vibes', - candidateHorizonHours: 72, - maxItems: 3, - }); - await saveArticle({ - id: 'live-1', - title: 'Live story', - createdAt: new Date('2026-03-03T08:00:00.000Z'), - }); - await saveArticle({ - id: 'fresh-1', - title: 'Fresh story', - createdAt: new Date('2026-03-03T09:20:00.000Z'), - }); - await con.getRepository(PostHighlight).save({ - channel: 'vibes', - postId: 'live-1', - highlightedAt: new Date('2026-03-03T09:00:00.000Z'), - headline: 'Live headline', - }); - - const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') - .mockResolvedValue({ - items: [ - { - postId: 'fresh-1', - headline: 'Fresh headline', - significanceLabel: 'breaking', - reason: 'test', - }, - ], - }); + it('should create one canonical highlight and place it in every matching publish channel', async () => { + const now = new Date('2026-03-03T12:00:00.000Z'); + const triggerTypedEventSpy = jest + .spyOn(typedPubsub, 'triggerTypedEvent') + .mockResolvedValue(); - await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( - worker, + await con.getRepository(ChannelHighlightDefinition).save([ + { + channel: 'backend', + mode: 'publish', + order: 1, + candidateHorizonHours: 72, + maxItems: 3, + }, { channel: 'vibes', - scheduledAt: now.toISOString(), + mode: 'publish', + order: 2, + candidateHorizonHours: 72, + maxItems: 3, }, - ); - - expect(evaluatorSpy).toHaveBeenCalledTimes(1); - expect(evaluatorSpy.mock.calls[0][0].targetAudience).toBe( - 'Developers following vibes', - ); - expect(evaluatorSpy.mock.calls[0][0].maxItems).toBe(3); - expect(evaluatorSpy.mock.calls[0][0].currentHighlights).toEqual([ - expect.objectContaining({ - postId: 'live-1', - headline: 'Live headline', - }), ]); - expect(evaluatorSpy.mock.calls[0][0].newCandidates).toEqual([ - expect.objectContaining({ - postId: 'fresh-1', - title: 'Fresh story', - relatedItemsCount: 1, - }), - ]); - - const liveHighlights = await con.getRepository(PostHighlight).find({ - where: { channel: 'vibes', retiredAt: IsNull() }, - order: { highlightedAt: 'DESC' }, - }); - expect(liveHighlights).toHaveLength(1); - expect(liveHighlights[0]).toMatchObject({ - postId: 'live-1', - headline: 'Live headline', - }); - - const run = await con.getRepository(ChannelHighlightRun).findOneByOrFail({ - channel: 'vibes', - }); - expect(run.status).toBe('completed'); - expect(run.comparison).toMatchObject({ - wouldPublish: true, - published: false, - baselineCount: 1, - internalCount: 2, - addedPostIds: ['fresh-1'], - }); - }); - - it('should publish admitted highlights in publish mode and trim FIFO by maxItems', async () => { - const now = new Date('2026-03-03T11:00:00.000Z'); - await con.getRepository(ChannelHighlightDefinition).save({ - channel: 'vibes', - mode: 'publish', - candidateHorizonHours: 72, - maxItems: 2, - }); - await saveArticle({ - id: 'old-live', - title: 'Older live story', - createdAt: new Date('2026-03-03T08:00:00.000Z'), - }); - await saveArticle({ - id: 'new-live', - title: 'Newer live story', - createdAt: new Date('2026-03-03T09:00:00.000Z'), - }); await saveArticle({ id: 'fresh-1', - title: 'Fresh candidate', - createdAt: new Date('2026-03-03T10:30:00.000Z'), + title: 'Fresh story', + createdAt: new Date('2026-03-03T11:30:00.000Z'), + channels: ['backend', 'vibes'], }); - await con.getRepository(PostHighlight).save([ - { - channel: 'vibes', - postId: 'new-live', - highlightedAt: new Date('2026-03-03T10:00:00.000Z'), - headline: 'Newer live headline', - }, - { - channel: 'vibes', - postId: 'old-live', - highlightedAt: new Date('2026-03-03T09:00:00.000Z'), - headline: 'Older live headline', - }, - ]); jest.spyOn(evaluator, 'evaluateChannelHighlights').mockResolvedValue({ items: [ { postId: 'fresh-1', - headline: 'Fresh headline', + headline: 'Fresh highlight', significanceLabel: 'breaking', - reason: 'test', }, ], }); - await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + await expectSuccessfulTypedBackground<'api.v1.generate-highlights'>( worker, { - channel: 'vibes', scheduledAt: now.toISOString(), }, ); - const liveHighlights = await con.getRepository(PostHighlight).find({ - where: { channel: 'vibes', retiredAt: IsNull() }, - order: { highlightedAt: 'DESC' }, - }); - expect(liveHighlights).toHaveLength(2); - expect(liveHighlights[0]).toMatchObject({ + const highlight = await con.getRepository(PostHighlight).findOneByOrFail({ postId: 'fresh-1', - headline: 'Fresh headline', - significance: PostHighlightSignificance.Breaking, - reason: 'test', - }); - expect(liveHighlights[1]).toMatchObject({ - postId: 'new-live', - headline: 'Newer live headline', - }); - const retiredHighlight = await con.getRepository(PostHighlight).findOneBy({ - channel: 'vibes', - postId: 'old-live', }); - expect(retiredHighlight?.retiredAt).toBeInstanceOf(Date); - }); - - it('should upgrade a highlighted article to its collection without re-evaluating it', async () => { - const now = new Date('2026-03-03T11:30:00.000Z'); - await con.getRepository(ChannelHighlightDefinition).save({ - channel: 'vibes', - mode: 'publish', - candidateHorizonHours: 72, - maxItems: 3, - }); - await saveCollection({ - id: 'col-upgrade', - title: 'Collection upgrade', - createdAt: new Date('2026-03-03T11:10:00.000Z'), - }); - await saveArticle({ - id: 'child-upgrade', - title: 'Child story', - createdAt: new Date('2026-03-03T11:05:00.000Z'), - }); - await con.getRepository(PostRelation).save({ - postId: 'col-upgrade', - relatedPostId: 'child-upgrade', - type: PostRelationType.Collection, - }); - await con.getRepository(PostHighlight).save({ - channel: 'vibes', - postId: 'child-upgrade', - highlightedAt: new Date('2026-03-03T11:00:00.000Z'), - headline: 'Original child headline', - significance: PostHighlightSignificance.Major, - reason: 'existing', - }); - - const evaluatorSpy = jest.spyOn(evaluator, 'evaluateChannelHighlights'); - - await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( - worker, - { - channel: 'vibes', - scheduledAt: now.toISOString(), + const placements = await con.getRepository(PostHighlightChannel).find({ + where: { + highlightId: highlight.id, + retiredAt: IsNull(), }, - ); - - expect(evaluatorSpy).not.toHaveBeenCalled(); - - const liveHighlights = await con.getRepository(PostHighlight).find({ - where: { channel: 'vibes', retiredAt: IsNull() }, - }); - expect(liveHighlights).toEqual([ - expect.objectContaining({ - postId: 'col-upgrade', - headline: 'Original child headline', - significance: PostHighlightSignificance.Major, - reason: 'existing', - }), - ]); - expect(liveHighlights[0].highlightedAt.toISOString()).toBe( - '2026-03-03T11:00:00.000Z', - ); - const retiredHighlight = await con.getRepository(PostHighlight).findOneBy({ - channel: 'vibes', - postId: 'child-upgrade', - }); - expect(retiredHighlight?.retiredAt).toBeInstanceOf(Date); - }); - - it('should exclude retired highlights from candidates and keep them retired', async () => { - const now = new Date('2026-03-03T11:45:00.000Z'); - await con.getRepository(ChannelHighlightDefinition).save({ - channel: 'vibes', - mode: 'publish', - candidateHorizonHours: 72, - maxItems: 3, - }); - await saveArticle({ - id: 'retired-1', - title: 'Previously highlighted story', - createdAt: new Date('2026-03-03T11:15:00.000Z'), - }); - await saveArticle({ - id: 'fresh-1', - title: 'Fresh candidate', - createdAt: new Date('2026-03-03T11:20:00.000Z'), - }); - await con.getRepository(PostHighlight).save({ - channel: 'vibes', - postId: 'retired-1', - highlightedAt: new Date('2026-03-03T11:00:00.000Z'), - headline: 'Previously highlighted headline', - significance: PostHighlightSignificance.Major, - reason: 'previous run', - retiredAt: new Date('2026-03-03T11:10:00.000Z'), - }); - - const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') - .mockResolvedValue({ - items: [ - { - postId: 'fresh-1', - headline: 'Fresh headline', - significanceLabel: 'breaking', - reason: 'test', - }, - ], - }); - - await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( - worker, - { - channel: 'vibes', - scheduledAt: now.toISOString(), + order: { + channel: 'ASC', }, - ); - - expect(evaluatorSpy).toHaveBeenCalledTimes(1); - expect(evaluatorSpy.mock.calls[0][0].newCandidates).toEqual([ - expect.objectContaining({ - postId: 'fresh-1', - title: 'Fresh candidate', - }), - ]); - - const liveHighlights = await con.getRepository(PostHighlight).find({ - where: { channel: 'vibes', retiredAt: IsNull() }, - }); - expect(liveHighlights).toEqual([ - expect.objectContaining({ - postId: 'fresh-1', - headline: 'Fresh headline', - }), - ]); - - const retiredHighlights = await con.getRepository(PostHighlight).find({ - where: { channel: 'vibes', postId: 'retired-1' }, }); - expect(retiredHighlights).toHaveLength(1); - expect(retiredHighlights[0].retiredAt).toBeInstanceOf(Date); - }); - it('should send recent retired highlights to the evaluator while excluding resurfaced stories', async () => { - const now = new Date('2026-03-03T12:00:00.000Z'); - await con.getRepository(ChannelHighlightDefinition).save({ - channel: 'vibes', - mode: 'shadow', - candidateHorizonHours: 72, - maxItems: 3, - }); - await saveCollection({ - id: 'collection-1', - title: 'Collection story', - createdAt: new Date('2026-03-03T11:50:00.000Z'), - }); - await saveArticle({ - id: 'retired-child', - title: 'Retired child story', - createdAt: new Date('2026-03-03T11:20:00.000Z'), - }); - await saveArticle({ - id: 'fresh-child', - title: 'Fresh child story', - createdAt: new Date('2026-03-03T11:55:00.000Z'), - }); - await saveArticle({ - id: 'fresh-stand-1', - title: 'Fresh standalone story', - createdAt: new Date('2026-03-03T11:58:00.000Z'), - }); - await con.getRepository(PostRelation).save([ - { - postId: 'collection-1', - relatedPostId: 'retired-child', - type: PostRelationType.Collection, - }, - { - postId: 'collection-1', - relatedPostId: 'fresh-child', - type: PostRelationType.Collection, - }, - ]); - await con.getRepository(PostHighlight).save({ - channel: 'vibes', - postId: 'retired-child', - highlightedAt: new Date('2026-03-03T11:30:00.000Z'), - headline: 'Retired child headline', - significance: PostHighlightSignificance.Notable, - retiredAt: new Date('2026-03-03T11:40:00.000Z'), + expect(highlight).toMatchObject({ + postId: 'fresh-1', + channel: 'backend', + headline: 'Fresh highlight', + significance: PostHighlightSignificance.Breaking, + retiredAt: null, }); - - const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') - .mockResolvedValue({ items: [] }); - - await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( - worker, - { - channel: 'vibes', - scheduledAt: now.toISOString(), - }, - ); - - expect(evaluatorSpy).toHaveBeenCalledTimes(1); - expect(evaluatorSpy.mock.calls[0][0].currentHighlights).toEqual([ + expect(placements).toEqual([ expect.objectContaining({ - postId: 'collection-1', - headline: 'Retired child headline', + highlightId: highlight.id, + channel: 'backend', }), - ]); - expect(evaluatorSpy.mock.calls[0][0].newCandidates).toEqual([ expect.objectContaining({ - postId: 'fresh-stand-1', - title: 'Fresh standalone story', + highlightId: highlight.id, + channel: 'vibes', }), ]); - }); - - it('should ignore posts from channel digest sources for highlights', async () => { - const now = new Date('2026-03-03T11:50:00.000Z'); - await con - .getRepository(Source) - .save( - createSource( - AGENTS_DIGEST_SOURCE, - 'Agents Digest', - 'https://daily.dev/agents.png', - ), - ); - await con.getRepository(ChannelDigest).save({ - key: 'agentic', - sourceId: AGENTS_DIGEST_SOURCE, - channel: 'vibes', - targetAudience: 'Digest readers', - frequency: 'daily', - enabled: true, - }); - await con.getRepository(ChannelHighlightDefinition).save({ - channel: 'vibes', - mode: 'publish', - candidateHorizonHours: 72, - maxItems: 3, - }); - await saveArticle({ - id: 'digest-post', - sourceId: AGENTS_DIGEST_SOURCE, - title: 'Digest source post', - createdAt: new Date('2026-03-03T11:20:00.000Z'), - }); - await saveArticle({ - id: 'fresh-1', - title: 'Fresh candidate', - createdAt: new Date('2026-03-03T11:25:00.000Z'), - }); - await con.getRepository(PostHighlight).save({ - channel: 'vibes', - postId: 'digest-post', - highlightedAt: new Date('2026-03-03T11:10:00.000Z'), - headline: 'Digest highlight', - significance: PostHighlightSignificance.Major, - reason: 'existing', - }); - - const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') - .mockResolvedValue({ - items: [ - { - postId: 'fresh-1', - headline: 'Fresh headline', - significanceLabel: 'major', - reason: 'test', - }, - ], - }); - - await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( - worker, + expect(triggerTypedEventSpy).toHaveBeenCalledWith( + expect.anything(), + 'api.v1.highlight-created', { - channel: 'vibes', - scheduledAt: now.toISOString(), + highlightId: highlight.id, + postId: 'fresh-1', + headline: 'Fresh highlight', + significance: PostHighlightSignificance.Breaking, + highlightedAt: now.toISOString(), }, ); - - expect(evaluatorSpy).toHaveBeenCalledTimes(1); - expect(evaluatorSpy.mock.calls[0][0].newCandidates).toEqual([ - expect.objectContaining({ - postId: 'fresh-1', - }), - ]); - - const liveHighlights = await con.getRepository(PostHighlight).find({ - where: { channel: 'vibes', retiredAt: IsNull() }, - }); - expect(liveHighlights).toEqual([ - expect.objectContaining({ - postId: 'fresh-1', - headline: 'Fresh headline', - }), - ]); - - const retiredDigestHighlight = await con - .getRepository(PostHighlight) - .findOneByOrFail({ - channel: 'vibes', - postId: 'digest-post', - }); - expect(retiredDigestHighlight.retiredAt).toBeInstanceOf(Date); }); - it('should remove highlights that aged past the configured horizon', async () => { + it('should skip persistence when only shadow channels match', async () => { const now = new Date('2026-03-03T12:00:00.000Z'); - await con.getRepository(ChannelHighlightDefinition).save({ - channel: 'vibes', - mode: 'publish', - candidateHorizonHours: 24, - maxItems: 3, - }); - await saveArticle({ - id: 'expired-live', - title: 'Expired live story', - createdAt: new Date('2026-03-01T10:00:00.000Z'), - }); - await con.getRepository(PostHighlight).save({ - channel: 'vibes', - postId: 'expired-live', - highlightedAt: new Date('2026-03-02T10:00:00.000Z'), - headline: 'Expired headline', - }); - - const evaluatorSpy = jest.spyOn(evaluator, 'evaluateChannelHighlights'); + const triggerTypedEventSpy = jest + .spyOn(typedPubsub, 'triggerTypedEvent') + .mockResolvedValue(); - await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( - worker, - { - channel: 'vibes', - scheduledAt: now.toISOString(), - }, - ); - - expect(evaluatorSpy).not.toHaveBeenCalled(); - - const liveHighlights = await con.getRepository(PostHighlight).find({ - where: { channel: 'vibes', retiredAt: IsNull() }, - }); - expect(liveHighlights).toEqual([]); - const retiredHighlight = await con.getRepository(PostHighlight).findOneBy({ - channel: 'vibes', - postId: 'expired-live', - }); - expect(retiredHighlight?.retiredAt).toBeInstanceOf(Date); - }); - - it('should exclude posts older than the candidate horizon even when recently updated', async () => { - const now = new Date('2026-03-03T12:30:00.000Z'); await con.getRepository(ChannelHighlightDefinition).save({ - channel: 'vibes', + channel: 'backend', mode: 'shadow', candidateHorizonHours: 72, maxItems: 3, }); await saveArticle({ - id: 'old-1', - title: 'Old but active', - createdAt: new Date('2026-02-20T12:00:00.000Z'), - statsUpdatedAt: new Date('2026-03-03T11:55:00.000Z'), - metadataChangedAt: new Date('2026-03-03T11:55:00.000Z'), - }); - await saveArticle({ - id: 'fresh-1', - title: 'Fresh candidate', + id: 'shadow-1', + title: 'Shadow story', createdAt: new Date('2026-03-03T11:30:00.000Z'), + channels: ['backend'], }); - const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') - .mockResolvedValue({ - items: [ - { - postId: 'fresh-1', - headline: 'Fresh headline', - significanceLabel: 'notable', - reason: 'test', - }, - ], - }); - - await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( - worker, - { - channel: 'vibes', - scheduledAt: now.toISOString(), - }, - ); - - expect(evaluatorSpy).toHaveBeenCalledTimes(1); - expect(evaluatorSpy.mock.calls[0][0].newCandidates).toEqual([ - expect.objectContaining({ - postId: 'fresh-1', - title: 'Fresh candidate', - relatedItemsCount: 1, - }), - ]); - }); - - it('should replace unknown-source candidates with the most upvoted public share before evaluation', async () => { - const now = new Date('2026-03-03T12:40:00.000Z'); - await con.getRepository(ChannelHighlightDefinition).save({ - channel: 'vibes', - mode: 'publish', - candidateHorizonHours: 72, - maxItems: 3, - }); - await con - .getRepository(Source) - .save( - createSource( - UNKNOWN_SOURCE, - 'Unknown', - 'https://daily.dev/unknown.png', - undefined, - true, - ), - ); - await saveArticle({ - id: 'unk-orig-1', - sourceId: UNKNOWN_SOURCE, - title: 'Unknown source story', - createdAt: new Date('2026-03-03T12:20:00.000Z'), - }); - await saveShare({ - id: 'pub-share-1', - sharedPostId: 'unk-orig-1', - createdAt: new Date('2026-03-03T12:26:00.000Z'), - upvotes: 25, - }); - await saveShare({ - id: 'pub-share-2', - sharedPostId: 'unk-orig-1', - createdAt: new Date('2026-03-03T12:25:00.000Z'), - upvotes: 50, - }); - await saveShare({ - id: 'priv-share1', - sharedPostId: 'unk-orig-1', - createdAt: new Date('2026-03-03T12:27:00.000Z'), - upvotes: 100, - private: true, + jest.spyOn(evaluator, 'evaluateChannelHighlights').mockResolvedValue({ + items: [ + { + postId: 'shadow-1', + headline: 'Shadow highlight', + significanceLabel: 'major', + }, + ], }); - const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') - .mockImplementation(async ({ newCandidates }) => ({ - items: [ - { - postId: newCandidates[0].postId, - headline: 'Shared headline', - significanceLabel: 'breaking', - reason: 'test', - }, - ], - })); - - await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + await expectSuccessfulTypedBackground<'api.v1.generate-highlights'>( worker, { - channel: 'vibes', scheduledAt: now.toISOString(), }, ); - expect(evaluatorSpy).toHaveBeenCalledTimes(1); - expect(evaluatorSpy.mock.calls[0][0].newCandidates).toEqual([ - expect.objectContaining({ - postId: 'pub-share-2', - title: 'Unknown source story', - }), - ]); - - const liveHighlights = await con.getRepository(PostHighlight).find({ - where: { channel: 'vibes', retiredAt: IsNull() }, - }); - expect(liveHighlights).toEqual([ - expect.objectContaining({ - postId: 'pub-share-2', - headline: 'Shared headline', - significance: PostHighlightSignificance.Breaking, - reason: 'test', - }), - ]); + expect(await con.getRepository(PostHighlight).count()).toBe(0); + expect(await con.getRepository(PostHighlightChannel).count()).toBe(0); + expect(triggerTypedEventSpy).not.toHaveBeenCalledWith( + expect.anything(), + 'api.v1.highlight-created', + expect.anything(), + ); }); - it('should skip unknown-source candidates when no public share exists', async () => { - const now = new Date('2026-03-03T12:41:00.000Z'); + it('should retire displaced highlights when a newer published story takes the only slot', async () => { + const now = new Date('2026-03-03T12:00:00.000Z'); + await con.getRepository(ChannelHighlightDefinition).save({ - channel: 'vibes', + channel: 'backend', mode: 'publish', candidateHorizonHours: 72, - maxItems: 3, + maxItems: 1, }); - await con - .getRepository(Source) - .save( - createSource( - UNKNOWN_SOURCE, - 'Unknown', - 'https://daily.dev/unknown.png', - undefined, - true, - ), - ); await saveArticle({ - id: 'unk-orig-2', - sourceId: UNKNOWN_SOURCE, - title: 'Unknown source story 2', - createdAt: new Date('2026-03-03T12:21:00.000Z'), - }); - await saveShare({ - id: 'priv-share2', - sharedPostId: 'unk-orig-2', - createdAt: new Date('2026-03-03T12:28:00.000Z'), - upvotes: 100, - private: true, + id: 'old-1', + title: 'Old story', + createdAt: new Date('2026-03-03T09:00:00.000Z'), + channels: ['backend'], }); - - const evaluatorSpy = jest.spyOn(evaluator, 'evaluateChannelHighlights'); - - await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( - worker, - { - channel: 'vibes', - scheduledAt: now.toISOString(), - }, - ); - - expect(evaluatorSpy).not.toHaveBeenCalled(); - - const liveHighlights = await con.getRepository(PostHighlight).find({ - where: { channel: 'vibes', retiredAt: IsNull() }, + await saveArticle({ + id: 'fresh-2', + title: 'Fresh story', + createdAt: new Date('2026-03-03T11:30:00.000Z'), + channels: ['backend'], }); - expect(liveHighlights).toEqual([]); - }); - it('should exclude posts refreshed only by stats updates from incremental candidates', async () => { - const now = new Date('2026-03-03T12:45:00.000Z'); - await con.getRepository(ChannelHighlightDefinition).save({ - channel: 'vibes', - mode: 'shadow', - candidateHorizonHours: 72, - maxItems: 3, - lastFetchedAt: new Date('2026-03-03T12:20:00.000Z'), - }); - await saveArticle({ - id: 'stats-only-1', - title: 'Stats only refresh', - createdAt: new Date('2026-03-02T12:00:00.000Z'), - metadataChangedAt: new Date('2026-03-02T12:00:00.000Z'), - statsUpdatedAt: new Date('2026-03-03T12:40:00.000Z'), + const oldHighlight = await con.getRepository(PostHighlight).save({ + postId: 'old-1', + channel: 'backend', + highlightedAt: new Date('2026-03-03T10:00:00.000Z'), + headline: 'Old highlight', + significance: PostHighlightSignificance.Major, }); - await saveArticle({ - id: 'fresh-1', - title: 'Fresh candidate', - createdAt: new Date('2026-03-03T12:30:00.000Z'), + await con.getRepository(PostHighlightChannel).save({ + highlightId: oldHighlight.id, + channel: 'backend', + placedAt: oldHighlight.highlightedAt, }); - const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') - .mockResolvedValue({ - items: [ - { - postId: 'fresh-1', - headline: 'Fresh headline', - significanceLabel: 'notable', - reason: 'test', - }, - ], - }); + jest.spyOn(evaluator, 'evaluateChannelHighlights').mockResolvedValue({ + items: [ + { + postId: 'fresh-2', + headline: 'Fresh replacement', + significanceLabel: 'major', + }, + ], + }); - await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( + await expectSuccessfulTypedBackground<'api.v1.generate-highlights'>( worker, { - channel: 'vibes', scheduledAt: now.toISOString(), }, ); - expect(evaluatorSpy).toHaveBeenCalledTimes(1); - expect(evaluatorSpy.mock.calls[0][0].newCandidates).toEqual([ - expect.objectContaining({ - postId: 'fresh-1', - title: 'Fresh candidate', - }), - ]); - }); + const retiredHighlight = await con + .getRepository(PostHighlight) + .findOneByOrFail({ + postId: 'old-1', + }); + const freshHighlight = await con + .getRepository(PostHighlight) + .findOneByOrFail({ + postId: 'fresh-2', + }); + const retiredPlacement = await con + .getRepository(PostHighlightChannel) + .findOneByOrFail({ + highlightId: oldHighlight.id, + channel: 'backend', + }); - it('should exclude posts with rejected content curation types from candidates', async () => { - const now = new Date('2026-03-03T13:00:00.000Z'); - await con.getRepository(ChannelHighlightDefinition).save({ - channel: 'vibes', - mode: 'shadow', - candidateHorizonHours: 72, - maxItems: 5, - }); - await saveArticle({ - id: 'news-1', - title: 'News story', - createdAt: new Date('2026-03-03T12:00:00.000Z'), - contentCuration: ['news'], - }); - await saveArticle({ - id: 'release-1', - title: 'Release story', - createdAt: new Date('2026-03-03T12:10:00.000Z'), - contentCuration: ['release'], - }); - await saveArticle({ - id: 'tutorial-1', - title: 'Tutorial post', - createdAt: new Date('2026-03-03T12:20:00.000Z'), - contentCuration: ['tutorial'], - }); - await saveArticle({ - id: 'opinion-1', - title: 'Opinion post', - createdAt: new Date('2026-03-03T12:30:00.000Z'), - contentCuration: ['opinion'], - }); - await saveArticle({ - id: 'listicle-1', - title: 'Listicle post', - createdAt: new Date('2026-03-03T12:40:00.000Z'), - contentCuration: ['listicle'], + expect(retiredHighlight.retiredAt).toBeInstanceOf(Date); + expect(retiredPlacement.retiredAt).toBeInstanceOf(Date); + expect(freshHighlight).toMatchObject({ + postId: 'fresh-2', + retiredAt: null, + headline: 'Fresh replacement', + significance: PostHighlightSignificance.Major, }); - - const evaluatorSpy = jest - .spyOn(evaluator, 'evaluateChannelHighlights') - .mockResolvedValue({ items: [] }); - - await expectSuccessfulTypedBackground<'api.v1.generate-channel-highlight'>( - worker, - { - channel: 'vibes', - scheduledAt: now.toISOString(), - }, - ); - - expect(evaluatorSpy).toHaveBeenCalledTimes(1); - const candidateIds = evaluatorSpy.mock.calls[0][0].newCandidates.map( - (c: { postId: string }) => c.postId, - ); - expect(candidateIds).toContain('news-1'); - expect(candidateIds).toContain('release-1'); - expect(candidateIds).not.toContain('tutorial-1'); - expect(candidateIds).not.toContain('opinion-1'); - expect(candidateIds).not.toContain('listicle-1'); }); }); diff --git a/__tests__/workers/majorHighlightTweet.ts b/__tests__/workers/majorHighlightTweet.ts index 1cd88046ae..43b422efe9 100644 --- a/__tests__/workers/majorHighlightTweet.ts +++ b/__tests__/workers/majorHighlightTweet.ts @@ -20,7 +20,6 @@ const clearMajorHighlightTweetLocks = () => const createEvent = ( overrides: Partial<{ highlightId: string; - channel: string; postId: string; headline: string; significance: PostHighlightSignificance; @@ -28,7 +27,6 @@ const createEvent = ( }> = {}, ) => ({ highlightId: 'c', - channel: 'ai', postId: 'post-id', headline: 'Highlight headline', significance: PostHighlightSignificance.Breaking, @@ -80,7 +78,7 @@ describe('majorHighlightTweet worker', () => { }); it('should publish tweet for breaking highlights', async () => { - await expectSuccessfulTypedBackground<'api.v1.post-highlighted'>( + await expectSuccessfulTypedBackground<'api.v1.highlight-created'>( worker, createEvent({ highlightId: 'c', @@ -95,7 +93,7 @@ describe('majorHighlightTweet worker', () => { }); it('should publish tweet for major highlights', async () => { - await expectSuccessfulTypedBackground<'api.v1.post-highlighted'>( + await expectSuccessfulTypedBackground<'api.v1.highlight-created'>( worker, createEvent({ highlightId: 'c', @@ -111,7 +109,7 @@ describe('majorHighlightTweet worker', () => { }); it('should vary the prefix within the configured breaking options', async () => { - await expectSuccessfulTypedBackground<'api.v1.post-highlighted'>( + await expectSuccessfulTypedBackground<'api.v1.highlight-created'>( worker, createEvent({ highlightId: 'b', @@ -128,7 +126,7 @@ describe('majorHighlightTweet worker', () => { it('should publish only one tweet for multiple highlights on the same post', async () => { const postId = 'post-dedup'; - await expectSuccessfulTypedBackground<'api.v1.post-highlighted'>( + await expectSuccessfulTypedBackground<'api.v1.highlight-created'>( worker, createEvent({ highlightId: 'c', @@ -137,11 +135,10 @@ describe('majorHighlightTweet worker', () => { }), ); - await expectSuccessfulTypedBackground<'api.v1.post-highlighted'>( + await expectSuccessfulTypedBackground<'api.v1.highlight-created'>( worker, createEvent({ highlightId: 'highlight-dedup-2', - channel: 'opensource', postId, headline: 'Second highlight headline', }), @@ -160,11 +157,11 @@ describe('majorHighlightTweet worker', () => { headline: 'Retry highlight headline', }); - await expectSuccessfulTypedBackground<'api.v1.post-highlighted'>( + await expectSuccessfulTypedBackground<'api.v1.highlight-created'>( worker, event, ); - await expectSuccessfulTypedBackground<'api.v1.post-highlighted'>( + await expectSuccessfulTypedBackground<'api.v1.highlight-created'>( worker, event, ); @@ -176,7 +173,7 @@ describe('majorHighlightTweet worker', () => { }); it('should skip non-major highlights', async () => { - await expectSuccessfulTypedBackground<'api.v1.post-highlighted'>( + await expectSuccessfulTypedBackground<'api.v1.highlight-created'>( worker, createEvent({ highlightId: 'highlight-routine', @@ -190,7 +187,7 @@ describe('majorHighlightTweet worker', () => { }); it('should publish the full headline without trimming or truncating', async () => { - await expectSuccessfulTypedBackground<'api.v1.post-highlighted'>( + await expectSuccessfulTypedBackground<'api.v1.highlight-created'>( worker, createEvent({ highlightId: 'c', diff --git a/__tests__/workers/newHighlightRealTime.ts b/__tests__/workers/newHighlightRealTime.ts index b1b48433f7..1b6b5cdaed 100644 --- a/__tests__/workers/newHighlightRealTime.ts +++ b/__tests__/workers/newHighlightRealTime.ts @@ -1,8 +1,8 @@ import { DataSource } from 'typeorm'; -import { PostHighlightedMessage } from '@dailydotdev/schema'; import createOrGetConnection from '../../src/db'; import { NEW_HIGHLIGHT_CHANNEL } from '../../src/common/highlights'; import { ArticlePost, PostHighlight, Source } from '../../src/entity'; +import { PostHighlightChannel } from '../../src/entity/PostHighlightChannel'; import { PostType } from '../../src/entity/posts/Post'; import { SourceType } from '../../src/entity/Source'; import { redisPubSub } from '../../src/redis'; @@ -32,7 +32,8 @@ describe('newHighlightRealTime worker', () => { }); afterEach(async () => { - await con.getRepository(PostHighlight).clear(); + await con.getRepository(PostHighlightChannel).clear(); + await con.createQueryBuilder().delete().from(PostHighlight).execute(); await con.getRepository(ArticlePost).delete(['highlight-post']); await con.getRepository(Source).delete(['highlight-source']); }); @@ -69,14 +70,20 @@ describe('newHighlightRealTime worker', () => { highlightedAt: new Date('2026-04-26T10:05:00.000Z'), headline: 'New backend highlight', }); + await con.getRepository(PostHighlightChannel).save({ + highlightId: highlight.id, + channel: 'backend', + placedAt: highlight.highlightedAt, + }); const publishSpy = jest.spyOn(redisPubSub, 'publish'); - await expectSuccessfulTypedBackground<'api.v1.post-highlighted'>( - worker, - new PostHighlightedMessage({ - highlightId: highlight.id, - }), - ); + await expectSuccessfulTypedBackground<'api.v1.highlight-created'>(worker, { + highlightId: highlight.id, + postId: 'highlight-post', + headline: 'New backend highlight', + significance: 0, + highlightedAt: highlight.highlightedAt.toISOString(), + }); expect(publishSpy).toHaveBeenCalledTimes(1); expect(publishSpy).toHaveBeenCalledWith( diff --git a/__tests__/workers/notifications/majorHeadlineAdded.ts b/__tests__/workers/notifications/majorHeadlineAdded.ts index 1b0417c436..44bc22a9c0 100644 --- a/__tests__/workers/notifications/majorHeadlineAdded.ts +++ b/__tests__/workers/notifications/majorHeadlineAdded.ts @@ -1,5 +1,4 @@ import { DataSource } from 'typeorm'; -import { PostHighlightedMessage } from '@dailydotdev/schema'; import createOrGetConnection from '../../../src/db'; import { Post, PostType, Source, User } from '../../../src/entity'; import { PostHighlightSignificance } from '../../../src/entity/PostHighlight'; @@ -28,18 +27,16 @@ beforeEach(async () => { const baseMessage = { highlightId: 'h1', - channel: 'global', postId: 'p1', headline: 'Breaking news headline', significance: PostHighlightSignificance.Breaking, - reason: undefined, highlightedAt: new Date(), }; const invoke = (overrides: Partial = {}) => - invokeTypedNotificationWorker<'api.v1.post-highlighted'>( + invokeTypedNotificationWorker<'api.v1.highlight-created'>( majorHeadlineAdded, - new PostHighlightedMessage({ ...baseMessage, ...overrides }), + { ...baseMessage, ...overrides }, ); describe('majorHeadlineAdded notification worker', () => { @@ -60,7 +57,6 @@ describe('majorHeadlineAdded notification worker', () => { expect(result?.[0]?.type).toEqual(NotificationType.MajorHeadlineAdded); expect(result?.[0]?.ctx).toMatchObject({ headline: baseMessage.headline, - channel: baseMessage.channel, significance: PostHighlightSignificance.Breaking, }); }); diff --git a/src/common/channelHighlight/decisions.ts b/src/common/channelHighlight/decisions.ts index 73e8f1a286..43931e8d48 100644 --- a/src/common/channelHighlight/decisions.ts +++ b/src/common/channelHighlight/decisions.ts @@ -1,16 +1,18 @@ import type { HighlightItem } from './types'; const toItemSignature = (item: { + channel: string; + channels: string[]; postId: string; headline: string; significanceLabel: string | null; - reason: string | null; }): string => [ + item.channel, + item.channels.join(','), item.postId, item.headline, item.significanceLabel || '', - item.reason || '', ].join('|'); export const compareSnapshots = ({ diff --git a/src/common/channelHighlight/evaluate.ts b/src/common/channelHighlight/evaluate.ts index a4256981b4..323c30a585 100644 --- a/src/common/channelHighlight/evaluate.ts +++ b/src/common/channelHighlight/evaluate.ts @@ -20,7 +20,6 @@ export type EvaluatedHighlightItem = { postId: string; headline: string; significanceLabel: string | null; - reason: string; }; export type EvaluateChannelHighlightsResponse = { @@ -134,7 +133,6 @@ export const evaluateChannelHighlights = async ({ postId: item.postId, headline: item.headline, significanceLabel: toSignificanceLabel(item.significance), - reason: item.reason || '', }; }), }; diff --git a/src/common/channelHighlight/generate.ts b/src/common/channelHighlight/generate.ts index face68670d..54e98e142c 100644 --- a/src/common/channelHighlight/generate.ts +++ b/src/common/channelHighlight/generate.ts @@ -1,12 +1,11 @@ import type { DataSource } from 'typeorm'; import { logger as baseLogger } from '../../logger'; import { ChannelHighlightDefinition } from '../../entity/ChannelHighlightDefinition'; -import { ChannelHighlightRun } from '../../entity/ChannelHighlightRun'; import { UNKNOWN_SOURCE } from '../../entity/Source'; +import { getChannelHighlightDefinitions } from './definitions'; import { getChannelDigestSourceIds } from '../channelDigest/definitions'; -import { compareSnapshots } from './decisions'; import { evaluateChannelHighlights } from './evaluate'; -import { replaceHighlightsForChannel } from './publish'; +import { replaceHighlights } from './publish'; import { fetchCurrentHighlights, fetchEvaluationHistoryHighlights, @@ -15,7 +14,7 @@ import { fetchPublicShareFallbackPostIds, fetchRetiredHighlightPostIds, fetchRelations, - getEvaluationHistoryStart, + getDefinitionsHorizonHours, getFetchStart, getHorizonStart, mergePosts, @@ -26,54 +25,152 @@ import { buildCandidates, canonicalizeCurrentHighlights, toHighlightItem, - toStoredSnapshotItem, } from './stories'; -import type { GenerateChannelHighlightResult, HighlightItem } from './types'; +import type { + GenerateHighlightsResult, + HighlightItem, + HighlightPost, +} from './types'; -const trimHighlights = ({ +const sortDefinitions = ( + definitions: ChannelHighlightDefinition[], +): ChannelHighlightDefinition[] => + [...definitions].sort( + (left, right) => + left.order - right.order || left.channel.localeCompare(right.channel), + ); + +const sortHighlights = (items: HighlightItem[]): HighlightItem[] => + [...items].sort( + (left, right) => + right.highlightedAt.getTime() - left.highlightedAt.getTime(), + ); + +const getPostChannels = ({ post }: { post?: HighlightPost }): string[] => { + const channels = (post?.contentMeta as { channels?: unknown } | undefined) + ?.channels; + + if (!Array.isArray(channels)) { + return []; + } + + return channels.filter( + (channel): channel is string => typeof channel === 'string' && !!channel, + ); +}; + +const toPrimaryChannel = ({ + channels, + definitions, +}: { + channels: string[]; + definitions: ChannelHighlightDefinition[]; +}): string | null => { + if (!channels.length) { + return null; + } + + const orderByChannel = new Map( + definitions.map((definition, index) => [definition.channel, index]), + ); + + return [...channels].sort((left, right) => { + const leftOrder = orderByChannel.get(left) ?? Number.MAX_SAFE_INTEGER; + const rightOrder = orderByChannel.get(right) ?? Number.MAX_SAFE_INTEGER; + + return leftOrder - rightOrder || left.localeCompare(right); + })[0]; +}; + +const selectPublishedHighlights = ({ + definitions, items, - maxItems, + postsById, + now, }: { + definitions: ChannelHighlightDefinition[]; items: HighlightItem[]; - maxItems: number; -}): HighlightItem[] => - [...items] - .sort( - (left, right) => - right.highlightedAt.getTime() - left.highlightedAt.getTime(), - ) - .slice(0, maxItems); + postsById: Map; + now: Date; +}): HighlightItem[] => { + const publishedDefinitions = sortDefinitions( + definitions.filter((definition) => definition.mode === 'publish'), + ); + const channelsByPostId = new Map(); + + for (const definition of publishedDefinitions) { + const horizonStart = getHorizonStart({ + now, + horizonHours: definition.candidateHorizonHours, + }); + const matchedItems = sortHighlights( + items.filter((item) => { + if (item.highlightedAt < horizonStart) { + return false; + } + + const post = postsById.get(item.postId); + return getPostChannels({ post }).includes(definition.channel); + }), + ).slice(0, definition.maxItems); + + for (const item of matchedItems) { + const channels = channelsByPostId.get(item.postId) || []; + if (!channels.includes(definition.channel)) { + channels.push(definition.channel); + channelsByPostId.set(item.postId, channels); + } + } + } + + return sortHighlights( + items + .map((item) => { + const channels = channelsByPostId.get(item.postId); + if (!channels?.length) { + return null; + } -// High-level flow: -// 1. Keep only currently highlighted items that are still inside the horizon. -// 2. Canonicalize those highlights to collections on the API side. -// 3. Build new canonical candidate posts from incremental post/relation fetches. -// 4. Ask the evaluator only about new candidates. -// 5. Append admitted items, trim FIFO by maxItems, then publish if the surface changed. -export const generateChannelHighlight = async ({ + const primaryChannel = toPrimaryChannel({ + channels, + definitions: publishedDefinitions, + }); + if (!primaryChannel) { + return null; + } + + return { + ...item, + channel: primaryChannel, + channels, + }; + }) + .filter((item): item is HighlightItem => !!item), + ); +}; + +export const generateHighlights = async ({ con, - definition, now = new Date(), }: { con: DataSource; - definition: ChannelHighlightDefinition; now?: Date; -}): Promise => { - const runRepo = con.getRepository(ChannelHighlightRun); - const run = await runRepo.save( - runRepo.create({ - channel: definition.channel, - scheduledAt: now, - status: 'processing', - baselineSnapshot: [], - inputSummary: {}, - internalSnapshot: [], - comparison: {}, - metrics: {}, - }), - ); +}): Promise => { + const definitions = await getChannelHighlightDefinitions({ + con, + }); + const horizonHours = getDefinitionsHorizonHours({ + definitions, + }); + + if (!definitions.length || horizonHours <= 0) { + return { + createdHighlights: [], + }; + } try { + const channels = definitions.map((definition) => definition.channel); const [ currentHighlights, retiredHighlightPostIds, @@ -82,48 +179,45 @@ export const generateChannelHighlight = async ({ ] = await Promise.all([ fetchCurrentHighlights({ con, - channel: definition.channel, }), fetchRetiredHighlightPostIds({ con, - channel: definition.channel, }), getChannelDigestSourceIds({ con, }), fetchEvaluationHistoryHighlights({ con, - channel: definition.channel, now, }), ]); const horizonStart = getHorizonStart({ now, - definition, + horizonHours, }); const fetchStart = getFetchStart({ now, - definition, + definitions, }); - const baselineHighlights = currentHighlights.map(toHighlightItem); const activeHighlights = baselineHighlights.filter( (item) => item.highlightedAt >= horizonStart, ); - const highlightedPostIds = activeHighlights.map((item) => item.postId); const evaluationHistoryPostIds = evaluationHistoryHighlights.map( (item) => item.postId, ); const [incrementalPosts, highlightedPosts, evaluationHistoryPosts] = await Promise.all([ - fetchIncrementalPosts({ - con, - channel: definition.channel, - fetchStart, - horizonStart, - excludedSourceIds, - }), + channels.length + ? fetchIncrementalPosts({ + con, + channels, + fetchStart, + horizonStart, + excludedSourceIds, + }) + : Promise.resolve([]), fetchPostsByIds({ con, ids: highlightedPostIds, @@ -157,6 +251,7 @@ export const generateChannelHighlight = async ({ excludedSourceIds, }); const availablePosts = mergePosts([basePosts, relationPosts]); + const postsById = new Map(availablePosts.map((post) => [post.id, post])); const inaccessiblePostIds = new Set( availablePosts .filter((post) => post.sourceId === UNKNOWN_SOURCE) @@ -201,7 +296,6 @@ export const generateChannelHighlight = async ({ inaccessiblePostIds, fallbackPostIds, }); - const currentHighlightPostIds = new Set( liveHighlights.map((item) => item.postId), ); @@ -227,121 +321,69 @@ export const generateChannelHighlight = async ({ !retiredHighlightPostIdSet.has(candidate.postId) && !retiredEvaluationPostIdSet.has(candidate.postId), ); - const admittedHighlights = newCandidates.length === 0 ? [] : ( await evaluateChannelHighlights({ - channel: definition.channel, - targetAudience: - definition.targetAudience.trim() || - `daily.dev readers following ${definition.channel}`, - maxItems: definition.maxItems, + channel: 'highlights', + targetAudience: 'daily.dev readers', + maxItems: newCandidates.length, currentHighlights: evaluationHighlights, newCandidates, }) - ).items.map((item) => ({ - postId: item.postId, - headline: item.headline, - highlightedAt: now, - significanceLabel: item.significanceLabel, - reason: item.reason, - })); + ).items + .map((item) => { + const post = postsById.get(item.postId); + const itemChannels = getPostChannels({ post }); + const primaryChannel = toPrimaryChannel({ + channels: itemChannels, + definitions: sortDefinitions(definitions), + }); - const internalHighlights = trimHighlights({ - items: [...liveHighlights, ...admittedHighlights], - maxItems: definition.maxItems, - }); - const comparison = compareSnapshots({ - baseline: baselineHighlights, - internal: internalHighlights, - }); - const publish = definition.mode === 'publish' && comparison.changed; + if (!primaryChannel) { + return null; + } - await con.transaction(async (manager) => { - await manager.getRepository(ChannelHighlightDefinition).update( - { channel: definition.channel }, - { + return { + channel: primaryChannel, + channels: [primaryChannel], + postId: item.postId, + headline: item.headline, + highlightedAt: now, + significanceLabel: item.significanceLabel, + }; + }) + .filter((item): item is HighlightItem => !!item); + const nextHighlights = selectPublishedHighlights({ + definitions, + items: sortHighlights([...liveHighlights, ...admittedHighlights]), + postsById, + now, + }); + const createdHighlights = await con.transaction(async (manager) => { + await manager + .createQueryBuilder() + .update(ChannelHighlightDefinition) + .set({ lastFetchedAt: now, - }, - ); - - if (publish) { - await replaceHighlightsForChannel({ - manager, - channel: definition.channel, - items: internalHighlights, - }); - } + }) + .where('"channel" IN (:...channels)', { + channels: definitions.map((definition) => definition.channel), + }) + .execute(); - await manager.getRepository(ChannelHighlightRun).update( - { id: run.id }, - { - status: 'completed', - completedAt: new Date(), - baselineSnapshot: baselineHighlights.map(toStoredSnapshotItem), - inputSummary: { - fetchStart: fetchStart.toISOString(), - horizonStart: horizonStart.toISOString(), - evaluationHistoryStart: getEvaluationHistoryStart({ - now, - }).toISOString(), - excludedSourceIds, - currentHighlightPostIds: liveHighlights.map((item) => item.postId), - evaluationHighlightPostIds: evaluationHighlights.map( - (item) => item.postId, - ), - retiredEvaluationHighlightPostIds: retiredEvaluationHighlights.map( - (item) => item.postId, - ), - retiredHighlightPostIds, - candidatePostIds: newCandidates.map( - (candidate) => candidate.postId, - ), - }, - internalSnapshot: internalHighlights.map(toStoredSnapshotItem), - comparison: { - ...comparison, - wouldPublish: comparison.changed, - published: publish, - }, - metrics: { - fetchedPosts: incrementalPosts.length + highlightedPosts.length, - relationPosts: relationPosts.length, - currentHighlights: baselineHighlights.length, - activeHighlights: activeHighlights.length, - canonicalizedHighlights: liveHighlights.length, - evaluationHighlights: evaluationHighlights.length, - retiredEvaluationHighlights: retiredEvaluationHighlights.length, - newCandidates: newCandidates.length, - admittedHighlights: admittedHighlights.length, - }, - }, - ); + return replaceHighlights({ + manager, + items: nextHighlights, + }); }); return { - run: await runRepo.findOneByOrFail({ - id: run.id, - }), - published: publish, + createdHighlights, }; } catch (err) { - baseLogger.error( - { err, channel: definition.channel }, - 'Failed channel highlight run', - ); - await runRepo.update( - { id: run.id }, - { - status: 'failed', - completedAt: new Date(), - error: { - message: err instanceof Error ? err.message : 'Unknown error', - }, - }, - ); + baseLogger.error({ err }, 'Failed global highlight run'); throw err; } }; diff --git a/src/common/channelHighlight/publish.ts b/src/common/channelHighlight/publish.ts index dbaf24da6f..f6deeb2869 100644 --- a/src/common/channelHighlight/publish.ts +++ b/src/common/channelHighlight/publish.ts @@ -1,89 +1,152 @@ import type { EntityManager } from 'typeorm'; import { PostHighlight, + PostHighlightSignificance, toPostHighlightSignificance, } from '../../entity/PostHighlight'; +import { PostHighlightChannel } from '../../entity/PostHighlightChannel'; import type { HighlightItem } from './types'; const normalizeHighlightItems = ({ items, - retiredPostIds, }: { items: HighlightItem[]; - retiredPostIds: Set; }): HighlightItem[] => { const dedupedItems = new Map(); for (const item of items) { - if (retiredPostIds.has(item.postId) || dedupedItems.has(item.postId)) { + if (dedupedItems.has(item.postId)) { continue; } - dedupedItems.set(item.postId, item); + dedupedItems.set(item.postId, { + ...item, + channels: [...new Set(item.channels)].sort(), + }); } return [...dedupedItems.values()]; }; -export const replaceHighlightsForChannel = async ({ +export const replaceHighlights = async ({ manager, - channel, items, }: { manager: EntityManager; - channel: string; items: HighlightItem[]; -}): Promise => { - const repo = manager.getRepository(PostHighlight); - const highlights = await repo.find({ - where: { - channel, - }, - }); - const currentHighlights = highlights.filter((item) => !item.retiredAt); +}): Promise< + { + highlightId: string; + postId: string; + headline: string; + significance: PostHighlightSignificance; + highlightedAt: string; + }[] +> => { + const highlightRepo = manager.getRepository(PostHighlight); + const placementRepo = manager.getRepository(PostHighlightChannel); + const now = new Date(); const nextItems = normalizeHighlightItems({ items, - retiredPostIds: new Set( - highlights.filter((item) => item.retiredAt).map((item) => item.postId), - ), }); - const currentByPostId = new Map( - currentHighlights.map((item) => [item.postId, item]), + const [existingHighlights, existingPlacements] = await Promise.all([ + highlightRepo.find(), + placementRepo.find(), + ]); + const existingHighlightByPostId = new Map( + existingHighlights.map((item) => [item.postId, item]), ); const nextPostIds = new Set(nextItems.map((item) => item.postId)); - const retiredPostIds = currentHighlights - .filter((item) => !nextPostIds.has(item.postId)) - .map((item) => item.postId); + const retiredHighlightIds = existingHighlights + .filter((item) => !item.retiredAt && !nextPostIds.has(item.postId)) + .map((item) => item.id); - if (retiredPostIds.length) { - await repo + if (retiredHighlightIds.length) { + await highlightRepo .createQueryBuilder() .update() - .set({ retiredAt: new Date() }) - .where('"channel" = :channel', { channel }) - .andWhere('"retiredAt" IS NULL') - .andWhere('"postId" IN (:...postIds)', { postIds: retiredPostIds }) + .set({ retiredAt: now }) + .where('id IN (:...ids)', { ids: retiredHighlightIds }) .execute(); } - if (!nextItems.length) { - return; - } - - await repo.save( + const savedHighlights = await highlightRepo.save( nextItems.map((item) => { - const currentHighlight = currentByPostId.get(item.postId); + const existingHighlight = existingHighlightByPostId.get(item.postId); - return repo.create({ - id: currentHighlight?.id, - channel, + return highlightRepo.create({ + id: existingHighlight?.id, + channel: item.channel, postId: item.postId, highlightedAt: item.highlightedAt, headline: item.headline, significance: toPostHighlightSignificance(item.significanceLabel), - reason: item.reason, retiredAt: null, }); }), ); + const savedHighlightByPostId = new Map( + savedHighlights.map((item) => [item.postId, item]), + ); + const nextPlacementKeys = new Set( + nextItems.flatMap((item) => { + const savedHighlight = savedHighlightByPostId.get(item.postId); + if (!savedHighlight) { + return []; + } + + return item.channels.map((channel) => `${savedHighlight.id}:${channel}`); + }), + ); + const retiredPlacements = existingPlacements.filter( + (placement) => + !placement.retiredAt && + !nextPlacementKeys.has(`${placement.highlightId}:${placement.channel}`), + ); + + if (retiredPlacements.length) { + await Promise.all( + retiredPlacements.map((placement) => + placementRepo.update( + { + highlightId: placement.highlightId, + channel: placement.channel, + }, + { + retiredAt: now, + }, + ), + ), + ); + } + + const nextPlacements = nextItems.flatMap((item) => { + const savedHighlight = savedHighlightByPostId.get(item.postId); + if (!savedHighlight) { + return []; + } + + return item.channels.map((channel) => + placementRepo.create({ + highlightId: savedHighlight.id, + channel, + placedAt: item.highlightedAt, + retiredAt: null, + }), + ); + }); + + if (nextPlacements.length) { + await placementRepo.save(nextPlacements); + } + + return savedHighlights + .filter((item) => !existingHighlightByPostId.has(item.postId)) + .map((item) => ({ + highlightId: item.id, + postId: item.postId, + headline: item.headline, + significance: item.significance, + highlightedAt: item.highlightedAt.toISOString(), + })); }; diff --git a/src/common/channelHighlight/queries.ts b/src/common/channelHighlight/queries.ts index fefa587d6d..cc526bd366 100644 --- a/src/common/channelHighlight/queries.ts +++ b/src/common/channelHighlight/queries.ts @@ -32,37 +32,51 @@ const HIGHLIGHT_EVALUATION_HISTORY_SECONDS = ONE_WEEK_IN_SECONDS; export const getHorizonStart = ({ now, - definition, + horizonHours, }: { now: Date; - definition: Pick; -}): Date => - new Date( - now.getTime() - - definition.candidateHorizonHours * ONE_HOUR_IN_SECONDS * 1000, + horizonHours: number; +}): Date => new Date(now.getTime() - horizonHours * ONE_HOUR_IN_SECONDS * 1000); + +export const getDefinitionsHorizonHours = ({ + definitions, +}: { + definitions: Pick[]; +}): number => + definitions.reduce( + (maxHours, definition) => + Math.max(maxHours, definition.candidateHorizonHours), + 0, ); export const getFetchStart = ({ now, - definition, + definitions, }: { now: Date; - definition: Pick< + definitions: Pick< ChannelHighlightDefinition, 'candidateHorizonHours' | 'lastFetchedAt' - >; + >[]; }): Date => { + const horizonHours = getDefinitionsHorizonHours({ + definitions, + }); const horizonStart = getHorizonStart({ now, - definition, + horizonHours, }); - if (!definition.lastFetchedAt) { + const fetchedAtTimestamps = definitions + .map((definition) => definition.lastFetchedAt?.getTime()) + .filter((timestamp): timestamp is number => typeof timestamp === 'number'); + + if (fetchedAtTimestamps.length !== definitions.length) { return horizonStart; } const overlapStart = new Date( - definition.lastFetchedAt.getTime() - HIGHLIGHT_FETCH_OVERLAP_SECONDS * 1000, + Math.min(...fetchedAtTimestamps) - HIGHLIGHT_FETCH_OVERLAP_SECONDS * 1000, ); return overlapStart > horizonStart ? overlapStart : horizonStart; @@ -70,14 +84,11 @@ export const getFetchStart = ({ export const fetchCurrentHighlights = async ({ con, - channel, }: { con: DataSource; - channel: string; }): Promise => con.getRepository(PostHighlight).find({ where: { - channel, retiredAt: IsNull(), }, order: { @@ -90,16 +101,13 @@ export const getEvaluationHistoryStart = ({ now }: { now: Date }): Date => export const fetchEvaluationHistoryHighlights = async ({ con, - channel, now, }: { con: DataSource; - channel: string; now: Date; }): Promise => con.getRepository(PostHighlight).find({ where: { - channel, highlightedAt: MoreThanOrEqual( getEvaluationHistoryStart({ now, @@ -113,17 +121,14 @@ export const fetchEvaluationHistoryHighlights = async ({ export const fetchRetiredHighlightPostIds = async ({ con, - channel, }: { con: DataSource; - channel: string; }): Promise => { const highlights = await con.getRepository(PostHighlight).find({ select: { postId: true, }, where: { - channel, retiredAt: Not(IsNull()), }, }); @@ -159,13 +164,13 @@ export const fetchPostsByIds = async ({ export const fetchIncrementalPosts = async ({ con, - channel, + channels, fetchStart, horizonStart, excludedSourceIds = [], }: { con: DataSource; - channel: string; + channels: string[]; fetchStart: Date; horizonStart: Date; excludedSourceIds?: string[]; @@ -179,7 +184,23 @@ export const fetchIncrementalPosts = async ({ .andWhere('post.banned = false') .andWhere('post.showOnFeed = true') .andWhere('post.sharedPostId IS NULL') - .andWhere(`(post."contentMeta"->'channels') ? :channel`, { channel }) + .andWhere( + new Brackets((builder) => { + channels.forEach((channel, index) => { + const clause = `(post."contentMeta"->'channels') ? :channel${index}`; + if (index === 0) { + builder.where(clause, { + [`channel${index}`]: channel, + }); + return; + } + + builder.orWhere(clause, { + [`channel${index}`]: channel, + }); + }); + }), + ) .andWhere(`NOT (post."contentCuration" && :rejectedCurations)`, { rejectedCurations: REJECTED_CONTENT_CURATIONS, }) diff --git a/src/common/channelHighlight/stories.ts b/src/common/channelHighlight/stories.ts index 937d610bf1..48a786cff3 100644 --- a/src/common/channelHighlight/stories.ts +++ b/src/common/channelHighlight/stories.ts @@ -191,14 +191,15 @@ export const buildCandidates = ({ export const toHighlightItem = ( item: Pick< CurrentHighlight, - 'postId' | 'headline' | 'highlightedAt' | 'significance' | 'reason' + 'channel' | 'postId' | 'headline' | 'highlightedAt' | 'significance' >, ): HighlightItem => ({ + channel: item.channel, + channels: [item.channel], postId: item.postId, headline: item.headline, highlightedAt: item.highlightedAt, significanceLabel: toPostHighlightSignificanceLabel(item.significance), - reason: item.reason, }); export const canonicalizeCurrentHighlights = ({ @@ -231,11 +232,12 @@ export const canonicalizeCurrentHighlights = ({ } canonicalHighlights.set(canonicalPostId, { + channel: highlight.channel, + channels: [...highlight.channels], postId: canonicalPostId, headline: highlight.headline, highlightedAt: highlight.highlightedAt, significanceLabel: highlight.significanceLabel, - reason: highlight.reason, }); } @@ -312,15 +314,17 @@ export const applyPublicShareFallbackToHighlights = ({ export const toStoredSnapshotItem = ( item: HighlightItem, ): { + channel: string; + channels: string[]; postId: string; headline: string; highlightedAt: string; significanceLabel: string | null; - reason: string | null; } => ({ + channel: item.channel, + channels: item.channels, postId: item.postId, headline: item.headline, highlightedAt: item.highlightedAt.toISOString(), significanceLabel: item.significanceLabel, - reason: item.reason, }); diff --git a/src/common/channelHighlight/types.ts b/src/common/channelHighlight/types.ts index 9e4803dab3..dc4bc92be0 100644 --- a/src/common/channelHighlight/types.ts +++ b/src/common/channelHighlight/types.ts @@ -1,5 +1,4 @@ import type { Post } from '../../entity/posts/Post'; -import type { ChannelHighlightRun } from '../../entity/ChannelHighlightRun'; import type { PostHighlightSignificance } from '../../entity/PostHighlight'; export type HighlightPost = Pick< @@ -53,22 +52,28 @@ export type HighlightCandidate = { export type CurrentHighlight = { id: string; + channel: string; postId: string; highlightedAt: Date; headline: string; significance: PostHighlightSignificance; - reason: string | null; }; export type HighlightItem = { postId: string; + channel: string; + channels: string[]; headline: string; highlightedAt: Date; significanceLabel: string | null; - reason: string | null; }; -export type GenerateChannelHighlightResult = { - run: ChannelHighlightRun; - published: boolean; +export type GenerateHighlightsResult = { + createdHighlights: { + highlightId: string; + postId: string; + headline: string; + significance: PostHighlightSignificance; + highlightedAt: string; + }[]; }; diff --git a/src/common/typedPubsub.ts b/src/common/typedPubsub.ts index 6a5534ba0a..92f30ae6c3 100644 --- a/src/common/typedPubsub.ts +++ b/src/common/typedPubsub.ts @@ -37,7 +37,6 @@ import { MatchedCandidate, type OpportunityMessage, type OpportunityPreviewResult, - PostHighlightedMessage, RecruiterAcceptedCandidateMatchMessage, type TransferResponse, type UserBriefingRequest, @@ -206,7 +205,13 @@ export type PubSubSchema = { }; 'skadi.v2.campaign-updated': CampaignUpdateEventArgs; 'api.v1.post-metrics-updated': z.infer; - 'api.v1.post-highlighted': PostHighlightedMessage; + 'api.v1.highlight-created': { + highlightId: Post['id']; + postId: Post['id']; + headline: string; + significance: number; + highlightedAt: string; + }; 'api.v1.reputation-event': { op: ChangeMessage['payload']['op']; payload: ChangeObject; @@ -314,8 +319,7 @@ export type PubSubSchema = { digestKey: string; scheduledAt: string; }; - 'api.v1.generate-channel-highlight': { - channel: string; + 'api.v1.generate-highlights': { scheduledAt: string; }; }; diff --git a/src/cron/channelHighlights.ts b/src/cron/channelHighlights.ts index 5e67545b38..f1c511e913 100644 --- a/src/cron/channelHighlights.ts +++ b/src/cron/channelHighlights.ts @@ -1,23 +1,14 @@ -import { getChannelHighlightDefinitions } from '../common/channelHighlight/definitions'; import { triggerTypedEvent } from '../common/typedPubsub'; import { Cron } from './cron'; const cron: Cron = { name: 'channel-highlights', - handler: async (con, logger) => { + handler: async (_, logger) => { const scheduledAt = new Date().toISOString(); - const definitions = await getChannelHighlightDefinitions({ - con, - }); - await Promise.all( - definitions.map(({ channel }) => - triggerTypedEvent(logger, 'api.v1.generate-channel-highlight', { - channel, - scheduledAt, - }), - ), - ); + await triggerTypedEvent(logger, 'api.v1.generate-highlights', { + scheduledAt, + }); }, }; diff --git a/src/cron/cleanChannelHighlights.ts b/src/cron/cleanChannelHighlights.ts index e682e463ff..b1efd36cb2 100644 --- a/src/cron/cleanChannelHighlights.ts +++ b/src/cron/cleanChannelHighlights.ts @@ -3,6 +3,7 @@ import { sub } from 'date-fns'; import { Cron } from './cron'; import { ChannelHighlightRun } from '../entity/ChannelHighlightRun'; import { PostHighlight } from '../entity/PostHighlight'; +import { PostHighlightChannel } from '../entity/PostHighlightChannel'; const HIGHLIGHT_RETENTION_DAYS = 30; const RUN_RETENTION_DAYS = 7; @@ -12,6 +13,9 @@ export const cleanChannelHighlights: Cron = { handler: async (con) => { const highlightCutoff = sub(new Date(), { days: HIGHLIGHT_RETENTION_DAYS }); const runCutoff = sub(new Date(), { days: RUN_RETENTION_DAYS }); + await con.getRepository(PostHighlightChannel).delete({ + retiredAt: LessThan(highlightCutoff), + }); await con.getRepository(PostHighlight).delete({ retiredAt: LessThan(highlightCutoff), }); diff --git a/src/entity/PostHighlight.ts b/src/entity/PostHighlight.ts index 6d5f0eb8c7..1f824f8015 100644 --- a/src/entity/PostHighlight.ts +++ b/src/entity/PostHighlight.ts @@ -51,17 +51,17 @@ export const toPostHighlightSignificanceLabel = ( } }; -@Entity() +@Entity('post_highlight') +@Index('UQ_post_highlight_post', ['postId'], { + unique: true, +}) @Index( - 'IDX_post_highlight_active_channel_highlightedAt', - ['channel', 'highlightedAt'], + 'IDX_post_highlight_active_significance_highlightedAt', + ['significance', 'highlightedAt'], { where: '"retiredAt" IS NULL', }, ) -@Index('UQ_post_highlight_channel_post', ['channel', 'postId'], { - unique: true, -}) @Index('IDX_post_highlight_retiredAt', ['retiredAt'], { where: '"retiredAt" IS NOT NULL', }) @@ -88,9 +88,6 @@ export class PostHighlight { }) significance: PostHighlightSignificance; - @Column({ type: 'text', nullable: true }) - reason: string | null; - @Column({ type: 'timestamp', nullable: true }) retiredAt: Date | null; diff --git a/src/entity/PostHighlightChannel.ts b/src/entity/PostHighlightChannel.ts new file mode 100644 index 0000000000..647f0e5cec --- /dev/null +++ b/src/entity/PostHighlightChannel.ts @@ -0,0 +1,41 @@ +import { + Column, + Entity, + Index, + JoinColumn, + ManyToOne, + PrimaryColumn, +} from 'typeorm'; +import { PostHighlight } from './PostHighlight'; + +@Entity('post_highlight_channel') +@Index( + 'IDX_post_highlight_channel_live_channel_placedAt', + ['channel', 'placedAt'], + { + where: '"retiredAt" IS NULL', + }, +) +@Index('IDX_post_highlight_channel_retiredAt', ['retiredAt'], { + where: '"retiredAt" IS NOT NULL', +}) +export class PostHighlightChannel { + @PrimaryColumn({ type: 'uuid' }) + highlightId: string; + + @PrimaryColumn({ type: 'text' }) + channel: string; + + @Column({ type: 'timestamp' }) + placedAt: Date; + + @Column({ type: 'timestamp', nullable: true }) + retiredAt: Date | null; + + @ManyToOne(() => PostHighlight, { + lazy: true, + onDelete: 'CASCADE', + }) + @JoinColumn({ name: 'highlightId' }) + highlight: Promise; +} diff --git a/src/entity/index.ts b/src/entity/index.ts index 48314add98..aab3ccf7b2 100644 --- a/src/entity/index.ts +++ b/src/entity/index.ts @@ -51,6 +51,7 @@ export * from './Quest'; export * from './QuestReward'; export * from './QuestRotation'; export * from './PostHighlight'; +export * from './PostHighlightChannel'; export * from './ChannelHighlightDefinition'; export * from './ChannelHighlightRun'; export * from './Archive'; diff --git a/src/migration/1776200000000-CanonicalHighlights.ts b/src/migration/1776200000000-CanonicalHighlights.ts new file mode 100644 index 0000000000..395a264d33 --- /dev/null +++ b/src/migration/1776200000000-CanonicalHighlights.ts @@ -0,0 +1,265 @@ +import type { MigrationInterface, QueryRunner } from 'typeorm'; + +export class CanonicalHighlights1776200000000 implements MigrationInterface { + name = 'CanonicalHighlights1776200000000'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + ALTER TABLE "post_highlight" + RENAME TO "post_highlight_legacy" + `); + + await queryRunner.query(` + ALTER TABLE "post_highlight_legacy" + RENAME CONSTRAINT "PK_post_highlight" TO "PK_post_highlight_legacy" + `); + + await queryRunner.query(` + ALTER TABLE "post_highlight_legacy" + RENAME CONSTRAINT "FK_post_highlight_post" TO "FK_post_highlight_legacy_post" + `); + + await queryRunner.query(` + ALTER INDEX "IDX_post_highlight_post" + RENAME TO "IDX_post_highlight_legacy_post" + `); + + await queryRunner.query(` + ALTER INDEX "IDX_post_highlight_retiredAt" + RENAME TO "IDX_post_highlight_legacy_retiredAt" + `); + + await queryRunner.query(` + ALTER INDEX "UQ_post_highlight_channel_post" + RENAME TO "UQ_post_highlight_legacy_channel_post" + `); + + await queryRunner.query(` + ALTER INDEX "IDX_post_highlight_active_channel_highlightedAt" + RENAME TO "IDX_post_highlight_legacy_active_channel_highlightedAt" + `); + + await queryRunner.query(` + CREATE TABLE "post_highlight" ( + "id" uuid NOT NULL DEFAULT uuid_generate_v4(), + "channel" text NOT NULL, + "postId" text NOT NULL, + "highlightedAt" TIMESTAMP NOT NULL, + "headline" text NOT NULL, + "significance" smallint NOT NULL DEFAULT 0, + "retiredAt" TIMESTAMP, + "createdAt" TIMESTAMP NOT NULL DEFAULT now(), + "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), + CONSTRAINT "PK_post_highlight" PRIMARY KEY ("id"), + CONSTRAINT "FK_post_highlight_post" + FOREIGN KEY ("postId") + REFERENCES "post"("id") + ON DELETE CASCADE + ON UPDATE NO ACTION, + CONSTRAINT "UQ_post_highlight_post" UNIQUE ("postId") + ) + `); + + await queryRunner.query(` + CREATE INDEX "IDX_post_highlight_post" + ON "post_highlight" ("postId") + `); + + await queryRunner.query(` + CREATE INDEX "IDX_post_highlight_active_significance_highlightedAt" + ON "post_highlight" ("significance", "highlightedAt" DESC) + WHERE "retiredAt" IS NULL + `); + + await queryRunner.query(` + CREATE INDEX "IDX_post_highlight_retiredAt" + ON "post_highlight" ("retiredAt") + WHERE "retiredAt" IS NOT NULL + `); + + await queryRunner.query(` + CREATE TABLE "post_highlight_channel" ( + "highlightId" uuid NOT NULL, + "channel" text NOT NULL, + "placedAt" TIMESTAMP NOT NULL, + "retiredAt" TIMESTAMP, + CONSTRAINT "PK_post_highlight_channel" PRIMARY KEY ("highlightId", "channel"), + CONSTRAINT "FK_post_highlight_channel_highlight" + FOREIGN KEY ("highlightId") + REFERENCES "post_highlight"("id") + ON DELETE CASCADE + ON UPDATE NO ACTION + ) + `); + + await queryRunner.query(` + CREATE INDEX "IDX_post_highlight_channel_live_channel_placedAt" + ON "post_highlight_channel" ("channel", "placedAt" DESC) + WHERE "retiredAt" IS NULL + `); + + await queryRunner.query(` + CREATE INDEX "IDX_post_highlight_channel_retiredAt" + ON "post_highlight_channel" ("retiredAt") + WHERE "retiredAt" IS NOT NULL + `); + + await queryRunner.query(` + WITH canonical_rows AS ( + SELECT DISTINCT ON (legacy."postId") + legacy."id", + legacy."channel", + legacy."postId", + legacy."highlightedAt", + legacy."headline", + legacy."significance", + CASE + WHEN bool_or(legacy."retiredAt" IS NULL) OVER ( + PARTITION BY legacy."postId" + ) + THEN NULL + ELSE max(legacy."retiredAt") OVER ( + PARTITION BY legacy."postId" + ) + END AS "retiredAt", + legacy."createdAt", + legacy."updatedAt" + FROM "post_highlight_legacy" AS legacy + ORDER BY + legacy."postId", + CASE WHEN legacy."retiredAt" IS NULL THEN 0 ELSE 1 END, + legacy."highlightedAt" DESC, + legacy."createdAt" DESC, + legacy."id" DESC + ) + INSERT INTO "post_highlight" ( + "id", + "channel", + "postId", + "highlightedAt", + "headline", + "significance", + "retiredAt", + "createdAt", + "updatedAt" + ) + SELECT + canonical."id", + canonical."channel", + canonical."postId", + canonical."highlightedAt", + canonical."headline", + canonical."significance", + canonical."retiredAt", + canonical."createdAt", + canonical."updatedAt" + FROM canonical_rows AS canonical + `); + + await queryRunner.query(` + INSERT INTO "post_highlight_channel" ( + "highlightId", + "channel", + "placedAt", + "retiredAt" + ) + SELECT + canonical."id", + legacy."channel", + legacy."highlightedAt", + legacy."retiredAt" + FROM "post_highlight_legacy" AS legacy + INNER JOIN "post_highlight" AS canonical + ON canonical."postId" = legacy."postId" + `); + + await queryRunner.query(` + DROP TABLE "post_highlight_legacy" + `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + ALTER TABLE "post_highlight" + RENAME TO "post_highlight_canonical" + `); + + await queryRunner.query(` + CREATE TABLE "post_highlight" ( + "id" uuid NOT NULL DEFAULT uuid_generate_v4(), + "channel" text NOT NULL, + "postId" text NOT NULL, + "highlightedAt" TIMESTAMP NOT NULL, + "headline" text NOT NULL, + "significance" smallint NOT NULL DEFAULT 0, + "reason" text, + "retiredAt" TIMESTAMP, + "createdAt" TIMESTAMP NOT NULL DEFAULT now(), + "updatedAt" TIMESTAMP NOT NULL DEFAULT now(), + CONSTRAINT "PK_post_highlight" PRIMARY KEY ("id"), + CONSTRAINT "FK_post_highlight_post" + FOREIGN KEY ("postId") + REFERENCES "post"("id") + ON DELETE CASCADE + ON UPDATE NO ACTION + ) + `); + + await queryRunner.query(` + CREATE INDEX "IDX_post_highlight_post" + ON "post_highlight" ("postId") + `); + + await queryRunner.query(` + CREATE INDEX "IDX_post_highlight_active_channel_highlightedAt" + ON "post_highlight" ("channel", "highlightedAt" DESC) + WHERE "retiredAt" IS NULL + `); + + await queryRunner.query(` + CREATE UNIQUE INDEX "UQ_post_highlight_channel_post" + ON "post_highlight" ("channel", "postId") + `); + + await queryRunner.query(` + CREATE INDEX "IDX_post_highlight_retiredAt" + ON "post_highlight" ("retiredAt") + WHERE "retiredAt" IS NOT NULL + `); + + await queryRunner.query(` + INSERT INTO "post_highlight" ( + "id", + "channel", + "postId", + "highlightedAt", + "headline", + "significance", + "retiredAt", + "createdAt", + "updatedAt" + ) + SELECT + channel_row."highlightId", + channel_row."channel", + canonical."postId", + channel_row."placedAt", + canonical."headline", + canonical."significance", + channel_row."retiredAt", + canonical."createdAt", + canonical."updatedAt" + FROM "post_highlight_channel" AS channel_row + INNER JOIN "post_highlight_canonical" AS canonical + ON canonical."id" = channel_row."highlightId" + `); + + await queryRunner.query(` + DROP TABLE "post_highlight_channel" + `); + + await queryRunner.query(` + DROP TABLE "post_highlight_canonical" + `); + } +} diff --git a/src/notifications/types.ts b/src/notifications/types.ts index db66371c9d..0d740e5ef5 100644 --- a/src/notifications/types.ts +++ b/src/notifications/types.ts @@ -225,7 +225,6 @@ export type NotificationAchievementContext = NotificationBaseContext & { export type NotificationMajorHeadlineContext = NotificationPostContext & { headline: string; - channel: string; significance: number; }; diff --git a/src/routes/sitemaps.ts b/src/routes/sitemaps.ts index 05cfbb5db7..5124c00726 100644 --- a/src/routes/sitemaps.ts +++ b/src/routes/sitemaps.ts @@ -11,6 +11,7 @@ import { } from '../entity'; import { ChannelHighlightDefinition } from '../entity/ChannelHighlightDefinition'; import { PostHighlight } from '../entity/PostHighlight'; +import { PostHighlightChannel } from '../entity/PostHighlightChannel'; import { ArchivePeriodType, ArchiveScopeType } from '../common/archive'; import { getUserProfileUrl } from '../common/users'; import createOrGetConnection from '../db'; @@ -335,12 +336,17 @@ const buildHighlightsSitemapQuery = ( source .createQueryBuilder() .select('chd.channel', 'channel') - .addSelect('MAX(ph."highlightedAt")', 'lastmod') + .addSelect('MAX(phc."placedAt")', 'lastmod') .from(ChannelHighlightDefinition, 'chd') + .leftJoin( + PostHighlightChannel, + 'phc', + 'phc.channel = chd.channel AND phc."retiredAt" IS NULL', + ) .leftJoin( PostHighlight, 'ph', - 'ph.channel = chd.channel AND ph."retiredAt" IS NULL', + 'ph.id = phc."highlightId" AND ph."retiredAt" IS NULL', ) .where('chd.mode != :disabledMode', { disabledMode: 'disabled' }) .groupBy('chd.channel') diff --git a/src/schema/highlights.ts b/src/schema/highlights.ts index a27c62c92c..2e99f15532 100644 --- a/src/schema/highlights.ts +++ b/src/schema/highlights.ts @@ -14,6 +14,7 @@ import { PostHighlight, PostHighlightSignificance, } from '../entity/PostHighlight'; +import { PostHighlightChannel } from '../entity/PostHighlightChannel'; import type { GQLSource } from './sources'; type GQLChannelDigestConfiguration = { @@ -108,28 +109,18 @@ const getMajorHeadlinesPage = (args: ConnectionArguments): OffsetPage => ({ offset: getOffsetWithDefault(args.after, -1) + 1, }); -const addMajorHeadlineFilter = ( - builder: SelectQueryBuilder, -): SelectQueryBuilder => +const addMajorHeadlineFilter = ({ + builder, + alias, +}: { + builder: SelectQueryBuilder; + alias: string; +}): SelectQueryBuilder => builder - .where('highlight.significance IN (:...significances)', { + .where(`${alias}.significance IN (:...significances)`, { significances: majorHeadlineSignificances, }) - .andWhere('highlight."retiredAt" IS NULL'); - -const getDedupedMajorHeadlinesQuery = ( - queryBuilder: SelectQueryBuilder, -) => - addMajorHeadlineFilter( - queryBuilder - .subQuery() - .select('highlight.id', 'id') - .from(PostHighlight, 'highlight'), - ) - .distinctOn(['highlight.postId']) - .orderBy('highlight.postId', 'ASC') - .addOrderBy('highlight.highlightedAt', 'DESC') - .addOrderBy('highlight.id', 'DESC'); + .andWhere(`${alias}."retiredAt" IS NULL`); export const resolvers: IResolvers = { Query: { @@ -149,20 +140,33 @@ export const resolvers: IResolvers = { true, ), postHighlights: async (_, args: { channel: string }, ctx: Context, info) => - graphorm.query( - ctx, - info, - (builder) => { - builder.queryBuilder - .where(`"${builder.alias}"."channel" = :channel`, { - channel: args.channel, - }) - .andWhere(`"${builder.alias}"."retiredAt" IS NULL`) - .orderBy(`"${builder.alias}"."highlightedAt"`, 'DESC'); - return builder; - }, - true, - ), + graphorm + .query( + ctx, + info, + (builder) => { + builder.queryBuilder + .innerJoin( + PostHighlightChannel, + 'placement', + `placement."highlightId" = "${builder.alias}".id`, + ) + .where('placement.channel = :channel', { + channel: args.channel, + }) + .andWhere('placement."retiredAt" IS NULL') + .andWhere(`"${builder.alias}"."retiredAt" IS NULL`) + .orderBy('placement."placedAt"', 'DESC'); + return builder; + }, + true, + ) + .then((items) => + items.map((item) => ({ + ...(item as Record), + channel: args.channel, + })), + ), majorHeadlines: async ( _, args: ConnectionArguments, @@ -178,17 +182,10 @@ export const resolvers: IResolvers = { (nodeSize) => nodeSize >= page.limit, (_, index) => offsetToCursor(page.offset + index), (builder) => { - const dedupedIdsQuery = getDedupedMajorHeadlinesQuery( - builder.queryBuilder as SelectQueryBuilder, - ); - - builder.queryBuilder - .innerJoin( - `(${dedupedIdsQuery.getQuery()})`, - 'deduped', - `deduped.id = ${builder.alias}.id`, - ) - .setParameters(dedupedIdsQuery.getParameters()) + addMajorHeadlineFilter({ + builder: builder.queryBuilder as SelectQueryBuilder, + alias: builder.alias, + }) .orderBy(`${builder.alias}."highlightedAt"`, 'DESC') .addOrderBy(`${builder.alias}."id"`, 'DESC') .offset(page.offset) diff --git a/src/workers/cdc/primary.ts b/src/workers/cdc/primary.ts index 06a5f21e07..5f0845cd17 100644 --- a/src/workers/cdc/primary.ts +++ b/src/workers/cdc/primary.ts @@ -1,8 +1,4 @@ -import { - OpportunityState, - OpportunityType, - PostHighlightedMessage, -} from '@dailydotdev/schema'; +import { OpportunityState, OpportunityType } from '@dailydotdev/schema'; import { Alerts, Banner, @@ -48,7 +44,6 @@ import { UserStreak, UserTopReader, Feedback, - PostHighlight, } from '../../entity'; import { BookmarkList } from '../../entity/BookmarkList'; import { HotTake } from '../../entity/user/HotTake'; @@ -2468,33 +2463,6 @@ const onFeedbackChange = async ( } }; -const onPostHighlightChange = async ( - con: DataSource, - logger: FastifyBaseLogger, - data: ChangeMessage, -) => { - if (data.payload.op !== 'c' || !data.payload.after) { - return; - } - - const { id, channel, postId, headline, significance, reason, highlightedAt } = - data.payload.after; - - await triggerTypedEvent( - logger, - 'api.v1.post-highlighted', - new PostHighlightedMessage({ - highlightId: id, - channel, - postId, - headline, - significance, - reason: reason ?? undefined, - highlightedAt, - }), - ); -}; - const onHotTakeChange = async ( con: DataSource, logger: FastifyBaseLogger, @@ -2764,9 +2732,6 @@ const worker: Worker = { case getTableName(con, Feedback): await onFeedbackChange(con, logger, data); break; - case getTableName(con, PostHighlight): - await onPostHighlightChange(con, logger, data); - break; case getTableName(con, HotTake): await onHotTakeChange(con, logger, data); break; diff --git a/src/workers/generateChannelHighlight.ts b/src/workers/generateChannelHighlight.ts index 93ea81b8a4..cbd58de8e1 100644 --- a/src/workers/generateChannelHighlight.ts +++ b/src/workers/generateChannelHighlight.ts @@ -1,74 +1,62 @@ import { ONE_DAY_IN_SECONDS, ONE_MINUTE_IN_SECONDS } from '../common/constants'; -import { getChannelHighlightDefinitionByChannel } from '../common/channelHighlight/definitions'; -import { generateChannelHighlight } from '../common/channelHighlight/generate'; +import { generateHighlights } from '../common/channelHighlight/generate'; +import { triggerTypedEvent } from '../common/typedPubsub'; import { TypedWorker } from './worker'; import { withRedisDoneLock } from './withRedisDoneLock'; -const CHANNEL_HIGHLIGHT_LOCK_TTL_SECONDS = 10 * ONE_MINUTE_IN_SECONDS; -const CHANNEL_HIGHLIGHT_DONE_TTL_SECONDS = 2 * ONE_DAY_IN_SECONDS; +const HIGHLIGHTS_LOCK_TTL_SECONDS = 10 * ONE_MINUTE_IN_SECONDS; +const HIGHLIGHTS_DONE_TTL_SECONDS = 2 * ONE_DAY_IN_SECONDS; -const getChannelHighlightDoneKey = ({ - channel, +const getHighlightsDoneKey = ({ scheduledAt, }: { - channel: string; scheduledAt: string; -}): string => `channel-highlight:done:${channel}:${scheduledAt}`; +}): string => `highlights:done:${scheduledAt}`; -const getChannelHighlightLockKey = ({ - channel, +const getHighlightsLockKey = ({ scheduledAt, }: { - channel: string; scheduledAt: string; -}): string => `channel-highlight:lock:${channel}:${scheduledAt}`; +}): string => `highlights:lock:${scheduledAt}`; -const worker: TypedWorker<'api.v1.generate-channel-highlight'> = { - subscription: 'api.generate-channel-highlight', +const worker: TypedWorker<'api.v1.generate-highlights'> = { + subscription: 'api.generate-highlights-v2', handler: async (message, con, logger): Promise => { - const { channel, scheduledAt } = message.data; - const logDetails = { channel, scheduledAt, messageId: message.messageId }; - const definition = await getChannelHighlightDefinitionByChannel({ - con, - channel, - }); - - if (!definition) { - logger.error(logDetails, 'Channel highlight definition not found'); - return; - } - + const { scheduledAt } = message.data; + const logDetails = { scheduledAt, messageId: message.messageId }; const now = new Date(scheduledAt); + if (Number.isNaN(now.getTime())) { - logger.error(logDetails, 'Channel highlight scheduledAt is invalid'); + logger.error(logDetails, 'Highlight scheduledAt is invalid'); return; } try { await withRedisDoneLock({ - doneKey: getChannelHighlightDoneKey({ - channel, + doneKey: getHighlightsDoneKey({ scheduledAt, }), - lockKey: getChannelHighlightLockKey({ - channel, + lockKey: getHighlightsLockKey({ scheduledAt, }), - lockValue: message.messageId || channel, - lockTtlSeconds: CHANNEL_HIGHLIGHT_LOCK_TTL_SECONDS, - doneTtlSeconds: CHANNEL_HIGHLIGHT_DONE_TTL_SECONDS, - execute: () => - generateChannelHighlight({ + lockValue: message.messageId || scheduledAt, + lockTtlSeconds: HIGHLIGHTS_LOCK_TTL_SECONDS, + doneTtlSeconds: HIGHLIGHTS_DONE_TTL_SECONDS, + execute: async () => { + const { createdHighlights } = await generateHighlights({ con, - definition, now, - }).then(() => undefined), + }); + + await Promise.all( + createdHighlights.map((highlight) => + triggerTypedEvent(logger, 'api.v1.highlight-created', highlight), + ), + ); + }, }); } catch (err) { - logger.error( - { ...logDetails, err }, - 'Failed to generate channel highlight', - ); + logger.error({ ...logDetails, err }, 'Failed to generate highlights'); throw err; } }, diff --git a/src/workers/majorHighlightTweet.ts b/src/workers/majorHighlightTweet.ts index 0bb4823f6a..6bba5b532b 100644 --- a/src/workers/majorHighlightTweet.ts +++ b/src/workers/majorHighlightTweet.ts @@ -1,4 +1,3 @@ -import { PostHighlightedMessage } from '@dailydotdev/schema'; import { ONE_DAY_IN_SECONDS, ONE_MINUTE_IN_SECONDS } from '../common/constants'; import { PostHighlightSignificance } from '../entity/PostHighlight'; import { getTwitterClient } from '../integrations/twitter/clients'; @@ -51,9 +50,8 @@ const withMajorHighlightTweetLock = ({ execute, }); -const worker: TypedWorker<'api.v1.post-highlighted'> = { - subscription: 'api.major-highlight-tweet', - parseMessage: (message) => PostHighlightedMessage.fromBinary(message.data), +const worker: TypedWorker<'api.v1.highlight-created'> = { + subscription: 'api.major-highlight-tweet-v2', handler: async ({ data, messageId }, _con, logger): Promise => { const { headline, highlightId, postId, significance } = data; diff --git a/src/workers/newHighlightRealTime.ts b/src/workers/newHighlightRealTime.ts index 16b60730c4..27f1610be5 100644 --- a/src/workers/newHighlightRealTime.ts +++ b/src/workers/newHighlightRealTime.ts @@ -1,16 +1,25 @@ -import { PostHighlightedMessage } from '@dailydotdev/schema'; import { NEW_HIGHLIGHT_CHANNEL } from '../common/highlights'; import { PostHighlight } from '../entity/PostHighlight'; +import { PostHighlightChannel } from '../entity/PostHighlightChannel'; import { redisPubSub } from '../redis'; import { TypedWorker } from './worker'; +import { IsNull } from 'typeorm'; -const worker: TypedWorker<'api.v1.post-highlighted'> = { - subscription: 'api.new-highlight-real-time', +const worker: TypedWorker<'api.v1.highlight-created'> = { + subscription: 'api.new-highlight-real-time-v2', handler: async (message, con, logger): Promise => { const { highlightId } = message.data; - const highlight = await con.getRepository(PostHighlight).findOne({ - where: { id: highlightId }, - }); + const [highlight, placements] = await Promise.all([ + con.getRepository(PostHighlight).findOne({ + where: { id: highlightId }, + }), + con.getRepository(PostHighlightChannel).find({ + where: { + highlightId, + retiredAt: IsNull(), + }, + }), + ]); if (!highlight) { logger.error( @@ -23,15 +32,19 @@ const worker: TypedWorker<'api.v1.post-highlighted'> = { const post = await highlight.post; const source = await post.source; - await redisPubSub.publish(NEW_HIGHLIGHT_CHANNEL, { - ...highlight, - post: { - ...post, - source, - }, - }); + await Promise.all( + placements.map((placement) => + redisPubSub.publish(NEW_HIGHLIGHT_CHANNEL, { + ...highlight, + channel: placement.channel, + post: { + ...post, + source, + }, + }), + ), + ); }, - parseMessage: (message) => PostHighlightedMessage.fromBinary(message.data), }; export default worker; diff --git a/src/workers/notifications/majorHeadlineAdded.ts b/src/workers/notifications/majorHeadlineAdded.ts index f160348c47..e6df6329ac 100644 --- a/src/workers/notifications/majorHeadlineAdded.ts +++ b/src/workers/notifications/majorHeadlineAdded.ts @@ -1,5 +1,4 @@ import { TypedNotificationWorker } from '../worker'; -import { PostHighlightedMessage } from '@dailydotdev/schema'; import { NotificationType } from '../../notifications/common'; import { PostHighlightSignificance } from '../../entity/PostHighlight'; import { PostType, User } from '../../entity'; @@ -16,12 +15,11 @@ const blockedPostTypes = new Set([ const streamConcurrency = 10; -export const majorHeadlineAdded: TypedNotificationWorker<'api.v1.post-highlighted'> = +export const majorHeadlineAdded: TypedNotificationWorker<'api.v1.highlight-created'> = { - subscription: 'api.major-headline-added-notification', - parseMessage: (message) => PostHighlightedMessage.fromBinary(message.data), + subscription: 'api.major-headline-added-notification-v2', handler: async (data, con) => { - const { postId, headline, channel, significance } = data; + const { postId, headline, significance } = data; if (significance !== PostHighlightSignificance.Breaking) { return; @@ -75,7 +73,6 @@ export const majorHeadlineAdded: TypedNotificationWorker<'api.v1.post-highlighte ...baseCtx, userIds, headline, - channel, significance, } as NotificationMajorHeadlineContext, },