Skip to content

Commit 6767c21

Browse files
committed
feat: support D1-backed crowd reports
1 parent c8eac26 commit 6767c21

5 files changed

Lines changed: 336 additions & 123 deletions

File tree

app/util/crowdReportDispatch.test.ts

Lines changed: 55 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,11 @@ function makeFakeDispatchUpdateDb(
131131
},
132132
};
133133
}
134-
return Promise.resolve();
134+
return {
135+
returning() {
136+
return Promise.resolve(currentReportRows);
137+
},
138+
};
135139
},
136140
};
137141
},
@@ -211,10 +215,17 @@ function makeFakeDispatchEligibilityDb() {
211215
}
212216

213217
function makeFakePostSendMarkMissDb() {
218+
let inTransaction = false;
214219
const updateSets: unknown[] = [];
215220
const whereCalls: unknown[] = [];
216221
let selectCount = 0;
217222
let updateCount = 0;
223+
const updateRowsByIndex = [
224+
[{ id: 'cluster-1' }],
225+
[{ id: 'stale-report-1' }],
226+
[],
227+
[{ id: 'stale-report-1' }],
228+
];
218229
const makeSelectBuilder = () => {
219230
const selectIndex = selectCount;
220231
selectCount += 1;
@@ -245,41 +256,46 @@ function makeFakePostSendMarkMissDb() {
245256
if (updateIndex === 0) {
246257
return {
247258
returning() {
248-
return Promise.resolve([{ id: 'cluster-1' }]);
249-
},
250-
};
251-
}
252-
if (updateIndex === 1) {
253-
return {
254-
returning() {
255-
return Promise.resolve([]);
259+
return Promise.resolve(updateRowsByIndex[updateIndex] ?? []);
256260
},
257261
};
258262
}
259-
return Promise.resolve();
263+
return {
264+
returning() {
265+
return Promise.resolve(updateRowsByIndex[updateIndex] ?? []);
266+
},
267+
};
260268
},
261269
};
262270
},
263271
};
264272

265273
return {
274+
get inTransaction() {
275+
return inTransaction;
276+
},
266277
updateSets,
267278
whereCalls,
268279
db: {
269-
transaction<T>(
280+
async transaction<T>(
270281
callback: (transaction: {
271282
select: () => ReturnType<typeof makeSelectBuilder>;
272283
update: () => typeof updateBuilder;
273284
}) => Promise<T>,
274285
) {
275-
return callback({
276-
select() {
277-
return makeSelectBuilder();
278-
},
279-
update() {
280-
return updateBuilder;
281-
},
282-
});
286+
inTransaction = true;
287+
try {
288+
return await callback({
289+
select() {
290+
return makeSelectBuilder();
291+
},
292+
update() {
293+
return updateBuilder;
294+
},
295+
});
296+
} finally {
297+
inTransaction = false;
298+
}
283299
},
284300
},
285301
};
@@ -405,6 +421,8 @@ describe('getDispatchableCrowdReportCandidates', () => {
405421
expect(whereSql).toContain('crowd_report_cluster_stations');
406422
expect(whereSql).toContain('still_happening');
407423
expect(whereSql).toContain('count(distinct');
424+
expect(whereSql).toContain('"crowd_report_clusters"."dispatched_at"');
425+
expect(whereSql).toContain('"crowd_reports"."dispatch_error"');
408426
});
409427

410428
it('requires single-report affected-area scope before applying the result limit', async () => {
@@ -421,6 +439,7 @@ describe('getDispatchableCrowdReportCandidates', () => {
421439

422440
expect(whereSql).toContain('crowd_report_lines');
423441
expect(whereSql).toContain('crowd_report_stations');
442+
expect(whereSql).toContain('"crowd_reports"."dispatch_error"');
424443
});
425444

426445
it('builds cluster dispatch payloads from ongoing reports only', async () => {
@@ -481,9 +500,8 @@ describe('getDispatchableCrowdReportCandidates', () => {
481500
expect(clusterUpdateWhereSql).not.toContain('"crowd_reports"."id" not in');
482501
expect(clusterUpdateWhereSql).not.toContain('"crowd_reports"."id" in');
483502
expect(reportUpdateWhereSql).toContain('"crowd_reports"."id" in');
484-
expect(reportUpdateWhereSql).not.toContain(
485-
'"crowd_reports"."cluster_id" =',
486-
);
503+
expect(reportUpdateWhereSql).toContain('"crowd_reports"."cluster_id" =');
504+
expect(reportUpdateWhereSql).toContain('"crowd_reports"."dispatch_error"');
487505
});
488506

489507
it('does not mark cluster payload reports when the cluster freshness update misses', async () => {
@@ -554,30 +572,30 @@ describe('getDispatchableCrowdReportCandidates', () => {
554572

555573
const dialect = new PgDialect();
556574
const lockWhereSql = dialect.sqlToQuery(fake.whereCalls[0] as SQL).sql;
557-
const eligibilityWhereSql = dialect.sqlToQuery(
558-
fake.whereCalls[1] as SQL,
559-
).sql;
560575

561576
expect(fetchImpl).not.toHaveBeenCalled();
562577
expect(lockWhereSql).toContain('"crowd_report_clusters"."id" =');
563578
expect(lockWhereSql).toContain('"crowd_report_clusters"."status" =');
564579
expect(lockWhereSql).toContain(
565580
'"crowd_report_clusters"."dispatched_at" is null',
566581
);
567-
expect(eligibilityWhereSql).toContain('"still_happening" is true');
568-
expect(eligibilityWhereSql).toContain('count(*)');
569-
expect(eligibilityWhereSql).not.toContain('::int');
570-
expect(eligibilityWhereSql).not.toContain('"crowd_reports"."id" not in');
571-
expect(eligibilityWhereSql).not.toContain('"crowd_reports"."id" in');
582+
expect(lockWhereSql).toContain('"still_happening" is true');
583+
expect(lockWhereSql).toContain('count(*)');
584+
expect(lockWhereSql).not.toContain('::int');
585+
expect(lockWhereSql).not.toContain('"crowd_reports"."id" not in');
586+
expect(lockWhereSql).not.toContain('"crowd_reports"."id" in');
572587
});
573588

574589
it('reports a post-send stale local success mark as failed without closing the cluster', async () => {
575590
const fake = makeFakePostSendMarkMissDb();
576-
const fetchImpl = vi.fn().mockResolvedValue(
577-
new Response(null, {
578-
status: 204,
579-
}),
580-
);
591+
const fetchImpl = vi.fn().mockImplementation(() => {
592+
expect(fake.inTransaction).toBe(false);
593+
return Promise.resolve(
594+
new Response(null, {
595+
status: 204,
596+
}),
597+
);
598+
});
581599

582600
await expect(
583601
dispatchPendingCrowdReports(
@@ -626,8 +644,8 @@ describe('getDispatchableCrowdReportCandidates', () => {
626644
});
627645

628646
expect(fetchImpl).toHaveBeenCalledTimes(1);
629-
expect(fake.updateSets).toHaveLength(3);
630-
expect(fake.updateSets[2]).toMatchObject({
647+
expect(fake.updateSets).toHaveLength(5);
648+
expect(fake.updateSets[4]).toMatchObject({
631649
dispatch_error:
632650
'Crowd report dispatch was sent, but local success marking became stale',
633651
});

0 commit comments

Comments
 (0)