8000 ENG-365 Add metrics for search and ingestion by leite08 · Pull Request #3912 · metriport/metriport · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
8000

ENG-365 Add metrics for search and ingestion #3912

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { makeLocation } from "../../../../../external/fhir/__tests__/location";
import { makePatient, PatientWithId } from "../../../../../external/fhir/__tests__/patient";
import { makeReference } from "../../../../../external/fhir/__tests__/reference";
import { FhirSearchResult } from "../../../../../external/opensearch/index-based-on-fhir";
import { OpenSearchFhirSearcher } from "../../../../../external/opensearch/lexical/fhir-searcher";
import { OpenSearchConsolidatedSearcher } from "../../../../../external/opensearch/lexical/fhir-searcher";
import { getEntryId as getEntryIdFromOpensearch } from "../../../../../external/opensearch/shared/id";
import { makeCondition } from "../../../../../fhir-to-cda/cda-templates/components/__tests__/make-condition";
import {
Expand All @@ -29,7 +29,7 @@ describe("search-consolidated", () => {
patient = makePatient();
patientId = patient.id;
getByIds_mock = jest
.spyOn(OpenSearchFhirSearcher.prototype, "getByIds")
.spyOn(OpenSearchConsolidatedSearcher.prototype, "getByIds")
.mockResolvedValue([]);
toEntryId = makeToEntryId(cxId, patientId);
toGetByIdsResultEntry = makeToGetByIdsResultEntry(cxId, patientId, toEntryId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { OpenSearchFhirSearcherConfig } from "../../../../external/opensearch/lexical/fhir-searcher";
import { OpenSearchConsolidatedSearcherConfig } from "../../../../external/opensearch/lexical/fhir-searcher";
import { Config } from "../../../../util/config";

export function getConfigs(): OpenSearchFhirSearcherConfig {
export function getConfigs(): OpenSearchConsolidatedSearcherConfig {
return {
region: Config.getAWSRegion(),
endpoint: Config.getSearchEndpoint(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
import { buildDayjs } from "@metriport/shared/common/date";
import { sleep } from "@metriport/shared/common/sleep";
import { Features } from "../../../../domain/features";
import { Patient } from "../../../../domain/patient";
import { OpenSearchFhirSearcher } from "../../../../external/opensearch/lexical/fhir-searcher";
import { CloudWatchUtils, Metrics } from "../../../../external/aws/cloudwatch";
import { OpenSearchConsolidatedSearcher } from "../../../../external/opensearch/lexical/fhir-searcher";
import { Config } from "../../../../util/config";
import { getConfigs } from "./fhir-config";
import { IngestConsolidatedDirect } from "./ingest-consolidated-direct";

const WAIT_AFTER_INGESTION_IN_MILLIS = 1_000;
const consolidatedDataIngestionInitialDate = Config.getConsolidatedDataIngestionInitialDate();

const cloudWatchUtils = new CloudWatchUtils(
Config.getAWSRegion(),
Features.ConsolidatedIngestIfNeeded
);

/**
* Ingests consolidated data if needed and applicable to the patient.
*
Expand All @@ -28,16 +35,27 @@ export async function ingestIfNeeded(patient: Patient): Promise<void> {
) {
return;
}
const metrics: Metrics = {};
const startedAt = Date.now();

const { cxId, id: patientId } = patient;

const searchService = new OpenSearchFhirSearcher(getConfigs());
let localStartedAt = Date.now();
const searchService = new OpenSearchConsolidatedSearcher(getConfigs());
const isIngested = await searchService.hasData({ cxId, patientId });
let elapsedTime = Date.now() - localStartedAt;
metrics.ingestIfNeeded_checkHasData = { duration: elapsedTime, timestamp: new Date() };

if (!isIngested) {
localStartedAt = Date.now();
const ingestor = new IngestConsolidatedDirect();
await ingestor.ingestConsolidatedIntoSearchEngine({ cxId, patientId });
elapsedTime = Date.now() - localStartedAt;
metrics.ingestIfNeeded_ingest = { duration: elapsedTime, timestamp: new Date() };

await sleep(WAIT_AFTER_INGESTION_IN_MILLIS);
}

metrics.ingestIfNeeded_total = { duration: Date.now() - startedAt, timestamp: new Date() };
await cloudWatchUtils.reportMetrics(metrics);
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import { Bundle, Resource } from "@medplum/fhirtypes";
import { MetriportError } from "@metriport/shared";
import { timed } from "@metriport/shared/util/duration";
import { Features } from "../../../../domain/features";
import { Patient } from "../../../../domain/patient";
import { CloudWatchUtils, Metrics, withMetrics } from "../../../../external/aws/cloudwatch";
import { normalize } from "../../../../external/fhir/consolidated/normalize";
import { OpenSearchFhirIngestor } from "../../../../external/opensearch/fhir-ingestor";
import { OnBulkItemError } from "../../../../external/opensearch/shared/bulk";
import { out } from "../../../../util";
import { Config } from "../../../../util/config";
import { getConsolidatedFile } from "../../consolidated-get";
import { getConfigs } from "./fhir-config";

const cloudWatchUtils = new CloudWatchUtils(Config.getAWSRegion(), Features.ConsolidatedIngestion);

/**
* Ingest a patient's consolidated resources into OpenSearch for lexical search.
*
Expand All @@ -23,14 +27,23 @@ export async function ingestPatientConsolidated({
}): Promise<void> {
const { cxId, id: patientId } = patient;
const { log } = out(`ingestPatientConsolidated - cx ${cxId}, pt ${patientId}`);
const metrics: Metrics = {};
const startedAt = Date.now();

const ingestor = new OpenSearchFhirIngestor(getConfigs());

log("Getting consolidated and cleaning up the index...");
const localStartedAt = Date.now();
const [bundle] = await Promise.all([
timed(() => getConsolidatedBundle({ cxId, patientId }), "getConsolidatedBundle", log),
ingestor.delete({ cxId, patientId }),
withMetrics(
() => getConsolidatedBundle({ cxId, patientId }),
"ingest_getConsolidatedBundle",
metrics,
log
),
withMetrics(() => ingestor.delete({ cxId, patientId }), "ingest_deleteIngested", metrics, log),
]);
metrics.ingest_preIngest = { duration: Date.now() - localStartedAt, timestamp: new Date() };

const resources =
bundle.entry?.flatMap(entry => {
Expand All @@ -41,14 +54,15 @@ export async function ingestPatientConsolidated({
}) ?? [];

log("Done, calling ingestBulk...");
const startedAt = Date.now();
const errors = await ingestor.ingestBulk({
cxId,
patientId,
resources,
onItemError,
});
const errors = await withMetrics(
() => ingestor.ingestBulk({ cxId, patientId, resources, onItemError }),
"ingest_ingestBulk",
metrics
);

const elapsedTime = Date.now() - startedAt;
metrics.ingest_total = { duration: elapsedTime, timestamp: new Date() };
await cloudWatchUtils.reportMetrics(metrics);

if (errors.size > 0) processErrors({ cxId, patientId, errors, log });

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Resource } from "@medplum/fhirtypes";
import { errorToString } from "@metriport/shared";
import { elapsedTimeFromNow } from "@metriport/shared/common/date";
import { SearchSetBundle } from "@metriport/shared/medical";
import { timed } from "@metriport/shared/util/duration";
import { uniq } from "lodash";
import { Features } from "../../../../domain/features";
import { Patient } from "../../../../domain/patient";
import { CloudWatchUtils, Metrics, withMetrics } from "../../../../external/aws/cloudwatch";
import { toFHIR as patientToFhir } from "../../../../external/fhir/patient/conversion";
import {
buildBundleEntry,
Expand All @@ -15,14 +15,17 @@ import {
FhirSearchResult,
rawContentFieldName,
} from "../../../../external/opensearch/index-based-on-fhir";
import { OpenSearchFhirSearcher } from "../../../../external/opensearch/lexical/fhir-searcher";
import { OpenSearchConsolidatedSearcher } from "../../../../external/opensearch/lexical/fhir-searcher";
import { getEntryId } from "../../../../external/opensearch/shared/id";
import { out } from "../../../../util";
import { Config } from "../../../../util/config";
import { searchDocuments } from "../document-reference/search";
import { getConfigs } from "./fhir-config";

const maxHydrationAttempts = 5;

const cloudWatchUtils = new CloudWatchUtils(Config.getAWSRegion(), Features.ConsolidatedSearch);

export type SearchConsolidatedParams = {
patient: Patient;
query: string | undefined;
Expand Down Expand Up @@ -55,10 +58,11 @@ export async function searchPatientConsolidated({
const { log } = out(`searchPatientConsolidated - cx ${patient.cxId}, pt ${patient.id}`);

log(`Getting consolidated and searching OS...`);
const startedAt = new Date();
const metrics: Metrics = {};
const startedAt = Date.now();

const searchFhirResourcesPromise = () =>
searchFhirResources({
const searchConsolidatedPromise = () =>
searchConsolidated({
cxId: patient.cxId,
patientId: patient.id,
query,
Expand All @@ -67,52 +71,47 @@ export async function searchPatientConsolidated({
const searchDocumentsPromise = () =>
searchDocuments({ cxId: patient.cxId, patientId: patient.id, contentFilter: query });

let localStartedAt = Date.now();
const [fhirResourcesResults, docRefResults] = await Promise.all([
timed(searchFhirResourcesPromise, "searchFhirResources", log),
timed(searchDocumentsPromise, "searchDocuments", log),
withMetrics(searchConsolidatedPromise, "search_consolidated", metrics, log),
withMetrics(searchDocumentsPromise, "search_documents", metrics, log),
]);

let elapsedTime = Date.now() - localStartedAt;
metrics.search_subTotal = { duration: elapsedTime, timestamp: new Date() };
log(
`Got ${fhirResourcesResults.length} resources and ${
docRefResults.length
} DocRefs in ${elapsedTimeFromNow(startedAt)} ms`
`Got ${fhirResourcesResults.length} resources and ${docRefResults.length} DocRefs in ${elapsedTime} ms, hydrating search results...`
);

let subStartedAt = new Date();
const resourcesMutable = fhirResourcesResults.flatMap(
r => fhirSearchResultToResource(r, log) ?? []
);
resourcesMutable.push(...docRefResults);
log(
`Loaded/converted ${resourcesMutable.length} resources in ${elapsedTimeFromNow(
subStartedAt
)} ms, hydrating search results...`
);

subStartedAt = new Date();
localStartedAt = Date.now();
const hydratedMutable = await hydrateMissingReferences({
cxId: patient.cxId,
patientId: patient.id,
resources: resourcesMutable,
});
log(`Hydrated to ${hydratedMutable.length} resources in ${elapsedTimeFromNow(subStartedAt)} ms.`);
elapsedTime = Date.now() - localStartedAt;
metrics.search_hydrate = { duration: elapsedTime, timestamp: new Date() };
log(`Hydrated to ${hydratedMutable.length} resources in ${elapsedTime} ms.`);

const patientResource = patientToFhir(patient);
hydratedMutable.push(patientResource);

const entries = hydratedMutable.map(buildBundleEntry);
const resultBundle = buildSearchSetBundle(entries);

log(
`Done in ${elapsedTimeFromNow(startedAt)} ms, returning ${
resultBundle.entry?.length
} resources...`
);
elapsedTime = Date.now() - startedAt;
metrics.search_total = { duration: elapsedTime, timestamp: new Date() };
await cloudWatchUtils.reportMetrics(metrics);
log(`Done in ${elapsedTime} ms, returning ${resultBundle.entry?.length} resources...`);

return resultBundle;
}

async function searchFhirResources({
async function searchConsolidated({
cxId,
patientId,
query,
Expand All @@ -121,7 +120,7 @@ async function searchFhirResources({
patientId: string;
query: string;
}): Promise<FhirSearchResult[]> {
const searchService = new OpenSearchFhirSearcher(getConfigs());
const searchService = new OpenSearchConsolidatedSearcher(getConfigs());
return await searchService.search({
cxId,
patientId,
Expand Down Expand Up @@ -152,7 +151,7 @@ export async function hydrateMissingReferences({

const uniqueIds = uniq(missingRefIds);

const searchService = new OpenSearchFhirSearcher(getConfigs());
const searchService 9091 = new OpenSearchConsolidatedSearcher(getConfigs());
const openSearchResults = await searchService.getByIds({
cxId,
patientId,
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/domain/features.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export enum Features {
ConsolidatedSearch = "consolidated-search",
ConsolidatedIngestion = "consolidated-ingestion",
ConsolidatedIngestIfNeeded = "consolidated-ingest-if-needed",
OpenSearchConsolidatedSearcher = "consolidated-search-open-search",
}
Loading
0