From 212684d8403eecad9b0f0495df79d6ad7b9305df Mon Sep 17 00:00:00 2001 From: Rafael Leite <2132564+leite08@users.noreply.github.com> Date: Thu, 29 May 2025 13:33:36 -0300 Subject: [PATCH 1/3] feat: add metrics for ingestion and search Ref eng-365 Signed-off-by: Rafael Leite <2132564+leite08@users.noreply.github.com> --- .../__tests__/search-consolidated.test.ts | 4 +- .../search/fhir-resource/fhir-config.ts | 4 +- .../search/fhir-resource/ingest-if-needed.ts | 22 ++- .../search/fhir-resource/ingest-lexical.ts | 34 +++- .../fhir-resource/search-consolidated.ts | 48 ++--- packages/core/src/domain/features.ts | 6 + packages/core/src/external/aws/cloudwatch.ts | 172 ++++++++++++++++++ .../opensearch/lexical/fhir-searcher.ts | 62 +++++-- packages/core/src/util/units.ts | 7 + packages/lambdas/src/shared/cloudwatch.ts | 10 + packages/lambdas/src/shared/units.ts | 15 +- 11 files changed, 329 insertions(+), 55 deletions(-) create mode 100644 packages/core/src/domain/features.ts create mode 100644 packages/core/src/external/aws/cloudwatch.ts create mode 100644 packages/core/src/util/units.ts diff --git a/packages/core/src/command/consolidated/search/fhir-resource/__tests__/search-consolidated.test.ts b/packages/core/src/command/consolidated/search/fhir-resource/__tests__/search-consolidated.test.ts index 61bc02fedd..bbd40fc1f1 100644 --- a/packages/core/src/command/consolidated/search/fhir-resource/__tests__/search-consolidated.test.ts +++ b/packages/core/src/command/consolidated/search/fhir-resource/__tests__/search-consolidated.test.ts @@ -6,7 +6,7 @@ import { makeLocation } from "../../../../../external/fhir/__tests__/location"; import { makePatient, PatientWithId } from "../../../../../external/fhir/__tests__/patient"; import { makeReference } from "../../../../../external/fhir/__tests__/reference"; import { FhirSearchResult } from "../../../../../external/opensearch/index-based-on-fhir"; -import { OpenSearchFhirSearcher } from "../../../../../external/opensearch/lexical/fhir-searcher"; +import { OpenSearchConsolidatedSearcher } from "../../../../../external/opensearch/lexical/fhir-searcher"; import { getEntryId as getEntryIdFromOpensearch } from "../../../../../external/opensearch/shared/id"; import { makeCondition } from "../../../../../fhir-to-cda/cda-templates/components/__tests__/make-condition"; import { @@ -29,7 +29,7 @@ describe("search-consolidated", () => { patient = makePatient(); patientId = patient.id; getByIds_mock = jest - .spyOn(OpenSearchFhirSearcher.prototype, "getByIds") + .spyOn(OpenSearchConsolidatedSearcher.prototype, "getByIds") .mockResolvedValue([]); toEntryId = makeToEntryId(cxId, patientId); toGetByIdsResultEntry = makeToGetByIdsResultEntry(cxId, patientId, toEntryId); diff --git a/packages/core/src/command/consolidated/search/fhir-resource/fhir-config.ts b/packages/core/src/command/consolidated/search/fhir-resource/fhir-config.ts index 90e7187844..3ae5c82356 100644 --- a/packages/core/src/command/consolidated/search/fhir-resource/fhir-config.ts +++ b/packages/core/src/command/consolidated/search/fhir-resource/fhir-config.ts @@ -1,7 +1,7 @@ -import { OpenSearchFhirSearcherConfig } from "../../../../external/opensearch/lexical/fhir-searcher"; +import { OpenSearchConsolidatedSearcherConfig } from "../../../../external/opensearch/lexical/fhir-searcher"; import { Config } from "../../../../util/config"; -export function getConfigs(): OpenSearchFhirSearcherConfig { +export function getConfigs(): OpenSearchConsolidatedSearcherConfig { return { region: Config.getAWSRegion(), endpoint: Config.getSearchEndpoint(), diff --git a/packages/core/src/command/consolidated/search/fhir-resource/ingest-if-needed.ts b/packages/core/src/command/consolidated/search/fhir-resource/ingest-if-needed.ts index 849efa7417..1543926753 100644 --- a/packages/core/src/command/consolidated/search/fhir-resource/ingest-if-needed.ts +++ b/packages/core/src/command/consolidated/search/fhir-resource/ingest-if-needed.ts @@ -1,7 +1,9 @@ import { buildDayjs } from "@metriport/shared/common/date"; import { sleep } from "@metriport/shared/common/sleep"; +import { Features } from "../../../../domain/features"; import { Patient } from "../../../../domain/patient"; -import { OpenSearchFhirSearcher } from "../../../../external/opensearch/lexical/fhir-searcher"; +import { CloudWatchUtils, Metrics } from "../../../../external/aws/cloudwatch"; +import { OpenSearchConsolidatedSearcher } from "../../../../external/opensearch/lexical/fhir-searcher"; import { Config } from "../../../../util/config"; import { getConfigs } from "./fhir-config"; import { IngestConsolidatedDirect } from "./ingest-consolidated-direct"; @@ -9,6 +11,11 @@ import { IngestConsolidatedDirect } from "./ingest-consolidated-direct"; const WAIT_AFTER_INGESTION_IN_MILLIS = 1_000; const consolidatedDataIngestionInitialDate = Config.getConsolidatedDataIngestionInitialDate(); +const cloudWatchUtils = new CloudWatchUtils( + Config.getAWSRegion(), + Features.ConsolidatedIngestIfNeeded +); + /** * Ingests consolidated data if needed and applicable to the patient. * @@ -28,16 +35,27 @@ export async function ingestIfNeeded(patient: Patient): Promise { ) { return; } + const metrics: Metrics = {}; + const startedAt = Date.now(); const { cxId, id: patientId } = patient; - const searchService = new OpenSearchFhirSearcher(getConfigs()); + let localStartedAt = Date.now(); + const searchService = new OpenSearchConsolidatedSearcher(getConfigs()); const isIngested = await searchService.hasData({ cxId, patientId }); + let elapsedTime = Date.now() - localStartedAt; + metrics.ingestIfNeeded_checkHasData = { duration: elapsedTime, timestamp: new Date() }; if (!isIngested) { + localStartedAt = Date.now(); const ingestor = new IngestConsolidatedDirect(); await ingestor.ingestConsolidatedIntoSearchEngine({ cxId, patientId }); + elapsedTime = Date.now() - localStartedAt; + metrics.ingestIfNeeded_ingest = { duration: elapsedTime, timestamp: new Date() }; await sleep(WAIT_AFTER_INGESTION_IN_MILLIS); } + + metrics.ingestIfNeeded_total = { duration: Date.now() - startedAt, timestamp: new Date() }; + await cloudWatchUtils.reportMetrics(metrics); } diff --git a/packages/core/src/command/consolidated/search/fhir-resource/ingest-lexical.ts b/packages/core/src/command/consolidated/search/fhir-resource/ingest-lexical.ts index afc07d39bd..73b917000c 100644 --- a/packages/core/src/command/consolidated/search/fhir-resource/ingest-lexical.ts +++ b/packages/core/src/command/consolidated/search/fhir-resource/ingest-lexical.ts @@ -1,14 +1,18 @@ import { Bundle, Resource } from "@medplum/fhirtypes"; import { MetriportError } from "@metriport/shared"; -import { timed } from "@metriport/shared/util/duration"; +import { Features } from "../../../../domain/features"; import { Patient } from "../../../../domain/patient"; +import { CloudWatchUtils, Metrics, withMetrics } from "../../../../external/aws/cloudwatch"; import { normalize } from "../../../../external/fhir/consolidated/normalize"; import { OpenSearchFhirIngestor } from "../../../../external/opensearch/fhir-ingestor"; import { OnBulkItemError } from "../../../../external/opensearch/shared/bulk"; import { out } from "../../../../util"; +import { Config } from "../../../../util/config"; import { getConsolidatedFile } from "../../consolidated-get"; import { getConfigs } from "./fhir-config"; +const cloudWatchUtils = new CloudWatchUtils(Config.getAWSRegion(), Features.ConsolidatedIngestion); + /** * Ingest a patient's consolidated resources into OpenSearch for lexical search. * @@ -23,14 +27,23 @@ export async function ingestPatientConsolidated({ }): Promise { const { cxId, id: patientId } = patient; const { log } = out(`ingestPatientConsolidated - cx ${cxId}, pt ${patientId}`); + const metrics: Metrics = {}; + const startedAt = Date.now(); const ingestor = new OpenSearchFhirIngestor(getConfigs()); log("Getting consolidated and cleaning up the index..."); + const localStartedAt = Date.now(); const [bundle] = await Promise.all([ - timed(() => getConsolidatedBundle({ cxId, patientId }), "getConsolidatedBundle", log), - ingestor.delete({ cxId, patientId }), + withMetrics( + () => getConsolidatedBundle({ cxId, patientId }), + "ingest_getConsolidatedBundle", + metrics, + log + ), + withMetrics(() => ingestor.delete({ cxId, patientId }), "ingest_deleteIngested", metrics, log), ]); + metrics.ingest_preIngest = { duration: Date.now() - localStartedAt, timestamp: new Date() }; const resources = bundle.entry?.flatMap(entry => { @@ -41,14 +54,15 @@ export async function ingestPatientConsolidated({ }) ?? []; log("Done, calling ingestBulk..."); - const startedAt = Date.now(); - const errors = await ingestor.ingestBulk({ - cxId, - patientId, - resources, - onItemError, - }); + const errors = await withMetrics( + () => ingestor.ingestBulk({ cxId, patientId, resources, onItemError }), + "ingest_ingestBulk", + metrics + ); + const elapsedTime = Date.now() - startedAt; + metrics.ingest_total = { duration: elapsedTime, timestamp: new Date() }; + await cloudWatchUtils.reportMetrics(metrics); if (errors.size > 0) processErrors({ cxId, patientId, errors, log }); diff --git a/packages/core/src/command/consolidated/search/fhir-resource/search-consolidated.ts b/packages/core/src/command/consolidated/search/fhir-resource/search-consolidated.ts index 2a2ed52fa3..f27d1a0dda 100644 --- a/packages/core/src/command/consolidated/search/fhir-resource/search-consolidated.ts +++ b/packages/core/src/command/consolidated/search/fhir-resource/search-consolidated.ts @@ -2,9 +2,10 @@ import { Resource } from "@medplum/fhirtypes"; import { errorToString } from "@metriport/shared"; import { elapsedTimeFromNow } from "@metriport/shared/common/date"; import { SearchSetBundle } from "@metriport/shared/medical"; -import { timed } from "@metriport/shared/util/duration"; import { uniq } from "lodash"; +import { Features } from "../../../../domain/features"; import { Patient } from "../../../../domain/patient"; +import { CloudWatchUtils, Metrics, withMetrics } from "../../../../external/aws/cloudwatch"; import { toFHIR as patientToFhir } from "../../../../external/fhir/patient/conversion"; import { buildBundleEntry, @@ -15,14 +16,17 @@ import { FhirSearchResult, rawContentFieldName, } from "../../../../external/opensearch/index-based-on-fhir"; -import { OpenSearchFhirSearcher } from "../../../../external/opensearch/lexical/fhir-searcher"; +import { OpenSearchConsolidatedSearcher } from "../../../../external/opensearch/lexical/fhir-searcher"; import { getEntryId } from "../../../../external/opensearch/shared/id"; import { out } from "../../../../util"; +import { Config } from "../../../../util/config"; import { searchDocuments } from "../document-reference/search"; import { getConfigs } from "./fhir-config"; const maxHydrationAttempts = 5; +const cloudWatchUtils = new CloudWatchUtils(Config.getAWSRegion(), Features.ConsolidatedSearch); + export type SearchConsolidatedParams = { patient: Patient; query: string | undefined; @@ -55,10 +59,11 @@ export async function searchPatientConsolidated({ const { log } = out(`searchPatientConsolidated - cx ${patient.cxId}, pt ${patient.id}`); log(`Getting consolidated and searching OS...`); - const startedAt = new Date(); + const metrics: Metrics = {}; + const startedAt = Date.now(); - const searchFhirResourcesPromise = () => - searchFhirResources({ + const searchConsolidatedPromise = () => + searchConsolidated({ cxId: patient.cxId, patientId: patient.id, query, @@ -67,15 +72,15 @@ export async function searchPatientConsolidated({ const searchDocumentsPromise = () => searchDocuments({ cxId: patient.cxId, patientId: patient.id, contentFilter: query }); + let localStartedAt = Date.now(); const [fhirResourcesResults, docRefResults] = await Promise.all([ - timed(searchFhirResourcesPromise, "searchFhirResources", log), - timed(searchDocumentsPromise, "searchDocuments", log), + withMetrics(searchConsolidatedPromise, "search_consolidated", metrics, log), + withMetrics(searchDocumentsPromise, "search_documents", metrics, log), ]); - + let elapsedTime = Date.now() - localStartedAt; + metrics.search_subTotal = { duration: elapsedTime, timestamp: new Date() }; log( - `Got ${fhirResourcesResults.length} resources and ${ - docRefResults.length - } DocRefs in ${elapsedTimeFromNow(startedAt)} ms` + `Got ${fhirResourcesResults.length} resources and ${docRefResults.length} DocRefs in ${elapsedTime} ms, hydrating search results...` ); let subStartedAt = new Date(); @@ -89,13 +94,15 @@ export async function searchPatientConsolidated({ )} ms, hydrating search results...` ); - subStartedAt = new Date(); + localStartedAt = Date.now(); const hydratedMutable = await hydrateMissingReferences({ cxId: patient.cxId, patientId: patient.id, resources: resourcesMutable, }); - log(`Hydrated to ${hydratedMutable.length} resources in ${elapsedTimeFromNow(subStartedAt)} ms.`); + elapsedTime = Date.now() - localStartedAt; + metrics.search_hydrate = { duration: elapsedTime, timestamp: new Date() }; + log(`Hydrated to ${hydratedMutable.length} resources in ${elapsedTime} ms.`); const patientResource = patientToFhir(patient); hydratedMutable.push(patientResource); @@ -103,16 +110,15 @@ export async function searchPatientConsolidated({ const entries = hydratedMutable.map(buildBundleEntry); const resultBundle = buildSearchSetBundle(entries); - log( - `Done in ${elapsedTimeFromNow(startedAt)} ms, returning ${ - resultBundle.entry?.length - } resources...` - ); + elapsedTime = Date.now() - startedAt; + metrics.search_total = { duration: elapsedTime, timestamp: new Date() }; + await cloudWatchUtils.reportMetrics(metrics); + log(`Done in ${elapsedTime} ms, returning ${resultBundle.entry?.length} resources...`); return resultBundle; } -async function searchFhirResources({ +async function searchConsolidated({ cxId, patientId, query, @@ -121,7 +127,7 @@ async function searchFhirResources({ patientId: string; query: string; }): Promise { - const searchService = new OpenSearchFhirSearcher(getConfigs()); + const searchService = new OpenSearchConsolidatedSearcher(getConfigs()); return await searchService.search({ cxId, patientId, @@ -152,7 +158,7 @@ export async function hydrateMissingReferences({ const uniqueIds = uniq(missingRefIds); - const searchService = new OpenSearchFhirSearcher(getConfigs()); + const searchService = new OpenSearchConsolidatedSearcher(getConfigs()); const openSearchResults = await searchService.getByIds({ cxId, patientId, diff --git a/packages/core/src/domain/features.ts b/packages/core/src/domain/features.ts new file mode 100644 index 0000000000..74c56557a5 --- /dev/null +++ b/packages/core/src/domain/features.ts @@ -0,0 +1,6 @@ +export enum Features { + ConsolidatedSearch = "consolidated-search", + ConsolidatedIngestion = "consolidated-ingestion", + ConsolidatedIngestIfNeeded = "consolidated-ingest-if-needed", + OpenSearchConsolidatedSearcher = "OpenSearchConsolidatedSearcher", +} diff --git a/packages/core/src/external/aws/cloudwatch.ts b/packages/core/src/external/aws/cloudwatch.ts new file mode 100644 index 0000000000..fa51687ad1 --- /dev/null +++ b/packages/core/src/external/aws/cloudwatch.ts @@ -0,0 +1,172 @@ +import { errorToString } from "@metriport/shared"; +import CloudWatch, { MetricData, MetricDatum } from "aws-sdk/clients/cloudwatch"; +import { capture } from "../../util/notifications"; +import { kbToMb, kbToMbString } from "../../util/units"; + +export type DurationMetric = { duration: number; count?: undefined; timestamp: Date }; +export type CountMetric = { duration?: undefined; count: number; timestamp: Date }; + +export type Metrics = Record; + +export const DEFAULT_METRICS_NAMESPACE = "Metriport"; + +/** + * Utility class for reporting metrics to CloudWatch. + * + * Requires either a `metricsNamespace` to be passed to the constructor or + * passed to the individual functions. + * + * Example: + * ``` + * const cloudWatchUtils = new CloudWatchUtils(region, context); + * await cloudWatchUtils.reportMetrics({ + * myMetric: { duration: 100, timestamp: new Date() }, + * }); + * ``` + * + * You can also group many metrics in one call: + * ``` + * const cloudWatchUtils = new CloudWatchUtils(region, context); + * const metrics: Metrics = {}; + * metrics.myMetric = { duration: 100, timestamp: new Date() }; + * ... + * metrics.myOtherMetric = { duration: 200, timestamp: new Date() }; + * ... + * await cloudWatchUtils.reportMetrics(metrics); + * ``` + */ +export class CloudWatchUtils { + public readonly _cloudWatch: CloudWatch; + + /** + * @param region - The AWS region to use for the CloudWatch client. + * @param context - The context to use for the metrics, like use case, product feature, + * lambda name, etc. + * @param metricsNamespace - The (CloudWatch) namespace to use for the metrics. Optional, + * defaults to `Metriport`. + */ + constructor( + readonly region: string, + readonly context: string, + readonly metricsNamespace: string = DEFAULT_METRICS_NAMESPACE + ) { + this._cloudWatch = new CloudWatch({ apiVersion: "2010-08-01", region }); + } + + get cloudWatch(): CloudWatch { + return this._cloudWatch; + } + + async reportMetrics(metrics: Metrics, metricsNamespace?: string) { + const namespaceToUse = metricsNamespace ?? this.metricsNamespace; + if (!namespaceToUse) throw new Error(`Missing metricsNamespace`); + const durationMetric = (name: string, values: DurationMetric): MetricDatum => ({ + MetricName: name, + Value: values.duration, + Unit: "Milliseconds", + Timestamp: values.timestamp, + Dimensions: [{ Name: "Service", Value: this.context }], + }); + const countMetric = (name: string, values: CountMetric) => ({ + MetricName: name, + Value: values.count, + Unit: "Count", + Timestamp: values.timestamp, + Dimensions: [{ Name: "Service", Value: this.context }], + }); + try { + const metricData: MetricData = []; + for (const [key, value] of Object.entries(metrics)) { + if (value.duration) { + metricData.push(durationMetric(key, value)); + } else if (value.count) { + metricData.push(countMetric(key, value)); + } + } + await this._cloudWatch + .putMetricData({ MetricData: metricData, Namespace: namespaceToUse }) + .promise(); + } catch (error) { + const msg = "Failed to report metrics"; + console.log(`${msg}, ${JSON.stringify(metrics)} - ${errorToString(error)}`); + capture.error(msg, { extra: { metrics, error } }); + // intentionally not rethrowing, don't want to fail the lambda + } + } + + /** + * Report memory usage to CloudWatch, under our custom namespace. + * + * NOTE: metricName should be defined, unless we're capturing the memory usage a single time + * per execution (e.g., lambda invocation). + * + * @param metricsNamespace - The namespace to use for the metrics. + * @param metricName - The name of the metric (e.g., "preSetup", "postSetup"). + */ + async reportMemoryUsage({ + metricsNamespace, + metricName, + }: { + metricsNamespace?: string; + metricName?: string; + } = {}) { + const namespaceToUse = metricsNamespace ?? this.metricsNamespace; + if (!namespaceToUse) throw new Error(`Missing metricsNamespace`); + const mem = process.memoryUsage(); + logMemoryUsage(mem); + try { + await this._cloudWatch + .putMetricData({ + MetricData: [ + { + MetricName: metricName ?? "Memory total", + Value: kbToMb(mem.rss), + Unit: "Megabytes", + Timestamp: new Date(), + Dimensions: [{ Name: "Service", Value: this.context }], + }, + ], + Namespace: namespaceToUse, + }) + .promise(); + } catch (error) { + const msg = "Failed to report memory usage"; + console.log(`${msg} - ${errorToString(error)}`); + capture.error(msg, { extra: { error } }); + // intentionally not rethrowing, don't want to fail the lambda + } + } +} + +export function logMemoryUsage(mem = process.memoryUsage()) { + console.log( + `[MEM] rss: ${kbToMbString(mem.rss)}, ` + + `heap: ${kbToMbString(mem.heapUsed)}/${kbToMbString(mem.heapTotal)}, ` + + `external: ${kbToMbString(mem.external)}, ` + + `arrayBuffers: ${kbToMbString(mem.arrayBuffers)}, ` + ); +} + +/** + * Executes a function and stores the time it took to execute it in the metrics object. + * Optionally logs the time it took to execute it if the log parameter is provided. + * + * @param fn - The function to execute. + * @param name - The name of the function to log. Also used to populate metrics if present. + * @param metrics - The metrics to populate. + * @param log - The logger to use. Optional, doesn't log if not provided. + * @returns The result of the function. + */ +export async function withMetrics( + fn: () => Promise, + name: string, + metrics: Metrics, + log?: typeof console.log +) { + const startedAt = Date.now(); + const result = await fn(); + const elapsedTime = Date.now() - startedAt; + metrics[name] = { duration: elapsedTime, timestamp: new Date() }; + if (log) log(`Done ${name} in ${elapsedTime} ms`); + return result; +} diff --git a/packages/core/src/external/opensearch/lexical/fhir-searcher.ts b/packages/core/src/external/opensearch/lexical/fhir-searcher.ts index af2f203e65..21d91886d4 100644 --- a/packages/core/src/external/opensearch/lexical/fhir-searcher.ts +++ b/packages/core/src/external/opensearch/lexical/fhir-searcher.ts @@ -1,6 +1,9 @@ import { errorToString } from "@metriport/shared"; import { Client } from "@opensearch-project/opensearch"; +import { Features } from "../../../domain/features"; import { capture, out } from "../../../util"; +import { Config } from "../../../util/config"; +import { CloudWatchUtils, Metrics, withMetrics } from "../../aws/cloudwatch"; import { OpenSearchConfigDirectAccess, OpenSearchResponse, @@ -13,7 +16,7 @@ import { getEntryId } from "../shared/id"; import { createSearchByIdsQuery } from "../shared/query"; import { createLexicalSearchQuery, createQueryHasData } from "./query"; -export type OpenSearchFhirSearcherConfig = OpenSearchConfigDirectAccess; +export type OpenSearchConsolidatedSearcherConfig = OpenSearchConfigDirectAccess; export type SearchRequest = { cxId: string; @@ -27,8 +30,15 @@ export type GetByIdRequest = { resourceId: string; }; -export class OpenSearchFhirSearcher { - constructor(readonly config: OpenSearchFhirSearcherConfig) {} +export class OpenSearchConsolidatedSearcher { + private readonly cloudWatchUtils: CloudWatchUtils; + + constructor(readonly config: OpenSearchConsolidatedSearcherConfig) { + this.cloudWatchUtils = new CloudWatchUtils( + Config.getAWSRegion(), + Features.OpenSearchConsolidatedSearcher + ); + } async search({ cxId, patientId, query }: SearchRequest): Promise { const { log } = out(`${this.constructor.name}.search - cx ${cxId}, pt ${patientId}`); @@ -44,12 +54,19 @@ export class OpenSearchFhirSearcher { query, }); - const response = await paginatedSearch({ - client, - indexName, - searchRequest, - mapResults, - }); + const metrics: Metrics = {}; + const response = await withMetrics( + () => + paginatedSearch({ + client, + indexName, + searchRequest, + mapResults, + }), + "search", + metrics + ); + await this.cloudWatchUtils.reportMetrics(metrics); log(`Successfully searched, got ${response.count} results`); return response.items; @@ -65,7 +82,13 @@ export class OpenSearchFhirSearcher { log(`Checking if data exists on index ${indexName}...`); const searchRequest = createQueryHasData({ cxId, patientId }); - const response = await client.search({ index: indexName, body: searchRequest }); + const metrics: Metrics = {}; + const response = await withMetrics( + () => client.search({ index: indexName, body: searchRequest }), + "hasData", + metrics + ); + await this.cloudWatchUtils.reportMetrics(metrics); const body = response.body as OpenSearchResponse; const hasData = body.hits.hits ? body.hits.hits.length > 0 : false; @@ -134,12 +157,19 @@ export class OpenSearchFhirSearcher { ids, }); - const response = await paginatedSearch({ - client, - indexName, - searchRequest, - mapResults, - }); + const metrics: Metrics = {}; + const response = await withMetrics( + () => + paginatedSearch({ + client, + indexName, + searchRequest, + mapResults, + }), + "getByIds", + metrics + ); + await this.cloudWatchUtils.reportMetrics(metrics); return response.items; } diff --git a/packages/core/src/util/units.ts b/packages/core/src/util/units.ts new file mode 100644 index 0000000000..9b9a449c73 --- /dev/null +++ b/packages/core/src/util/units.ts @@ -0,0 +1,7 @@ +export function kbToMbString(value: number) { + return Number(kbToMb(value)).toFixed(2) + "MB"; +} + +export function kbToMb(value: number) { + return value / 1048576; +} diff --git a/packages/lambdas/src/shared/cloudwatch.ts b/packages/lambdas/src/shared/cloudwatch.ts index 6947166791..97fcbfe3ba 100644 --- a/packages/lambdas/src/shared/cloudwatch.ts +++ b/packages/lambdas/src/shared/cloudwatch.ts @@ -12,6 +12,8 @@ export type Metrics = Record; * * Requires either a `metricsNamespace` to be passed to the constructor or * passed to the individual functions. + * + * @deprecated Use `CloudWatchUtils` from `@metriport/core/external/aws/cloudwatch` instead. */ export class CloudWatchUtils { public readonly _cloudWatch: CloudWatch; @@ -28,6 +30,9 @@ export class CloudWatchUtils { return this._cloudWatch; } + /** + * @deprecated Use `CloudWatchUtils` from `@metriport/core/external/aws/cloudwatch` instead. + */ async reportMetrics(metrics: Metrics, metricsNamespace?: string) { const namespaceToUse = metricsNamespace ?? this.metricsNamespace; if (!namespaceToUse) throw new Error(`Missing metricsNamespace`); @@ -72,6 +77,8 @@ export class CloudWatchUtils { * * @param metricsNamespace - The namespace to use for the metrics. * @param metricName - The name of the metric (e.g., "preSetup", "postSetup"). + * + * @deprecated Use `CloudWatchUtils` from `@metriport/core/external/aws/cloudwatch` instead. */ async reportMemoryUsage({ metricsNamespace, @@ -107,6 +114,9 @@ export class CloudWatchUtils { } } +/** + * @deprecated Use `logMemoryUsage` from `@metriport/core/external/aws/cloudwatch` instead. + */ export function logMemoryUsage(mem = process.memoryUsage()) { console.log( `[MEM] rss: ${kbToMbString(mem.rss)}, ` + diff --git a/packages/lambdas/src/shared/units.ts b/packages/lambdas/src/shared/units.ts index 9b9a449c73..ea2d3ae1bb 100644 --- a/packages/lambdas/src/shared/units.ts +++ b/packages/lambdas/src/shared/units.ts @@ -1,7 +1,18 @@ +import { + kbToMb as kbToMbFromCore, + kbToMbString as kbToMbStringFromCore, +} from "@metriport/core/util/units"; + +/** + * @deprecated Use `CloudWatchUtils` from `@metriport/core/util/units` instead. + */ export function kbToMbString(value: number) { - return Number(kbToMb(value)).toFixed(2) + "MB"; + return kbToMbStringFromCore(value); } +/** + * @deprecated Use `CloudWatchUtils` from `@metriport/core/util/units` instead. + */ export function kbToMb(value: number) { - return value / 1048576; + return kbToMbFromCore(value); } From 3175cb710d8260aeab09b99fd02d0cb25b91fd3d Mon Sep 17 00:00:00 2001 From: Rafael Leite <2132564+leite08@users.noreply.github.com> Date: Mon, 2 Jun 2025 13:47:29 -0300 Subject: [PATCH 2/3] fix(api): small adj on type and comment Ref eng-365 Signed-off-by: Rafael Leite <2132564+leite08@users.noreply.github.com> --- packages/core/src/domain/features.ts | 2 +- packages/lambdas/src/shared/units.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/core/src/domain/features.ts b/packages/core/src/domain/features.ts index 74c56557a5..62301b1ff0 100644 --- a/packages/core/src/domain/features.ts +++ b/packages/core/src/domain/features.ts @@ -2,5 +2,5 @@ export enum Features { ConsolidatedSearch = "consolidated-search", ConsolidatedIngestion = "consolidated-ingestion", ConsolidatedIngestIfNeeded = "consolidated-ingest-if-needed", - OpenSearchConsolidatedSearcher = "OpenSearchConsolidatedSearcher", + OpenSearchConsolidatedSearcher = "consolidated-search-open-search", } diff --git a/packages/lambdas/src/shared/units.ts b/packages/lambdas/src/shared/units.ts index ea2d3ae1bb..9df6991d4a 100644 --- a/packages/lambdas/src/shared/units.ts +++ b/packages/lambdas/src/shared/units.ts @@ -4,14 +4,14 @@ import { } from "@metriport/core/util/units"; /** - * @deprecated Use `CloudWatchUtils` from `@metriport/core/util/units` instead. + * @deprecated Use `kbToMbString` from `@metriport/core/util/units` instead. */ export function kbToMbString(value: number) { return kbToMbStringFromCore(value); } /** - * @deprecated Use `CloudWatchUtils` from `@metriport/core/util/units` instead. + * @deprecated Use `kbToMb` from `@metriport/core/util/units` instead. */ export function kbToMb(value: number) { return kbToMbFromCore(value); From 75bb9ee1a85199cda632bd3b3a7e7b9dbd4fbbed Mon Sep 17 00:00:00 2001 From: Rafael Leite <2132564+leite08@users.noreply.github.com> Date: Tue, 3 Jun 2025 17:08:20 -0700 Subject: [PATCH 3/3] fix(core): remove unnecessary duration to convert resources Ref eng-365 Signed-off-by: Rafael Leite <2132564+leite08@users.noreply.github.com> --- .../search/fhir-resource/search-consolidated.ts | 7 ------- 1 file changed, 7 deletions(-) diff --git a/packages/core/src/command/consolidated/search/fhir-resource/search-consolidated.ts b/packages/core/src/command/consolidated/search/fhir-resource/search-consolidated.ts index f27d1a0dda..2b97c7db98 100644 --- a/packages/core/src/command/consolidated/search/fhir-resource/search-consolidated.ts +++ b/packages/core/src/command/consolidated/search/fhir-resource/search-consolidated.ts @@ -1,6 +1,5 @@ import { Resource } from "@medplum/fhirtypes"; import { errorToString } from "@metriport/shared"; -import { elapsedTimeFromNow } from "@metriport/shared/common/date"; import { SearchSetBundle } from "@metriport/shared/medical"; import { uniq } from "lodash"; import { Features } from "../../../../domain/features"; @@ -83,16 +82,10 @@ export async function searchPatientConsolidated({ `Got ${fhirResourcesResults.length} resources and ${docRefResults.length} DocRefs in ${elapsedTime} ms, hydrating search results...` ); - let subStartedAt = new Date(); const resourcesMutable = fhirResourcesResults.flatMap( r => fhirSearchResultToResource(r, log) ?? [] ); resourcesMutable.push(...docRefResults); - log( - `Loaded/converted ${resourcesMutable.length} resources in ${elapsedTimeFromNow( - subStartedAt - )} ms, hydrating search results...` - ); localStartedAt = Date.now(); const hydratedMutable = await hydrateMissingReferences({