From 42c658bef212eb637179e51951ba8ba594fa55cc Mon Sep 17 00:00:00 2001 From: Michael Kalish Date: Wed, 24 Jul 2024 19:00:19 -0400 Subject: [PATCH] 15153: update FHIRConvert to create empty report/lineages for items with no next action (#15224) * 15153: update FHIRConvert to create empty report/lineages for items with no next action * fixup! 15153: update FHIRConvert to create empty report/lineages for items with no next action * fixup! 15153: update FHIRConvert to create empty report/lineages for items with no next action * Delete duplicated FHIR Converter integration tests * fixup! 15153: update FHIRConvert to create empty report/lineages for items with no next action * Update prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt Co-authored-by: Angela DeGolier <39102333+adegolier@users.noreply.github.com> --------- Co-authored-by: Angela DeGolier <39102333+adegolier@users.noreply.github.com> --- .../kotlin/fhirengine/engine/FHIRConverter.kt | 140 +++++++++++------- .../azure/FHIRConverterIntegrationTests.kt | 12 +- .../fhirengine/engine/FhirConverterTests.kt | 29 ++-- 3 files changed, 110 insertions(+), 71 deletions(-) diff --git a/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt b/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt index 1216187b2fa..9cc597b9b35 100644 --- a/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt +++ b/prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt @@ -126,9 +126,13 @@ class FHIRConverter( // TODO: https://github.com/CDCgov/prime-reportstream/issues/14287 FhirPathUtils - val fhirBundles = process(format, queueMessage, actionLogger) + val processedItems = process(format, queueMessage, actionLogger) - if (fhirBundles.isNotEmpty()) { + // processedItems can be empty in three scenarios: + // - the blob had no contents, i.e. an empty file was submitted + // - the format is HL7 and the contents were not parseable, so the number of items is unknown + // - the format is unexpected like CSV + if (processedItems.isNotEmpty()) { return LogMeasuredTime.measureAndLogDurationWithReturnedValue( "Applied sender transform and routed" ) { @@ -137,68 +141,92 @@ class FHIRConverter( ) maybeParallelize( - fhirBundles.size, - Streams.mapWithIndex(fhirBundles.stream()) { bundle, index -> + processedItems.size, + Streams.mapWithIndex(processedItems.stream()) { bundle, index -> Pair(bundle, index) }, "Applying sender transforms and routing" - ).map { (bundle, bundleIndex) -> + ).map { (processedItem, itemIndex) -> // conduct FHIR Transform - transformer?.process(bundle) - - // make a 'report' - val report = Report( - MimeFormat.FHIR, - emptyList(), - parentItemLineageData = listOf( - Report.ParentItemLineageData(queueMessage.reportId, bundleIndex.toInt() + 1) - ), - metadata = this.metadata, - topic = queueMessage.topic, - nextAction = TaskAction.destination_filter - ) + if (processedItem.bundle == null) { + val report = Report( + MimeFormat.FHIR, + emptyList(), + parentItemLineageData = listOf( + Report.ParentItemLineageData(queueMessage.reportId, itemIndex.toInt() + 1) + ), + metadata = this.metadata, + topic = queueMessage.topic, + nextAction = TaskAction.none + ) + val noneEvent = ProcessEvent( + Event.EventAction.NONE, + report.id, + Options.None, + emptyMap(), + emptyList() + ) + actionHistory.trackCreatedReport(noneEvent, report) + null + } else { + // We know from the null check above that this cannot be null + val bundle = processedItem.bundle!! + transformer?.process(bundle) + + // make a 'report' + val report = Report( + MimeFormat.FHIR, + emptyList(), + parentItemLineageData = listOf( + Report.ParentItemLineageData(queueMessage.reportId, itemIndex.toInt() + 1) + ), + metadata = this.metadata, + topic = queueMessage.topic, + nextAction = TaskAction.destination_filter + ) - // create route event - val routeEvent = ProcessEvent( - Event.EventAction.DESTINATION_FILTER, - report.id, - Options.None, - emptyMap(), - emptyList() - ) + // create route event + val routeEvent = ProcessEvent( + Event.EventAction.DESTINATION_FILTER, + report.id, + Options.None, + emptyMap(), + emptyList() + ) - // upload to blobstore - val bodyBytes = FhirTranscoder.encode(bundle).toByteArray() - val blobInfo = BlobAccess.uploadBody( - MimeFormat.FHIR, - bodyBytes, - report.id.toString(), - queueMessage.blobSubFolderName, - routeEvent.eventAction - ) + // upload to blobstore + val bodyBytes = FhirTranscoder.encode(bundle).toByteArray() + val blobInfo = BlobAccess.uploadBody( + MimeFormat.FHIR, + bodyBytes, + report.id.toString(), + queueMessage.blobSubFolderName, + routeEvent.eventAction + ) - // track created report - actionHistory.trackCreatedReport(routeEvent, report, blobInfo = blobInfo) - azureEventService.trackEvent( - ReportCreatedEvent( - report.id, - queueMessage.topic + // track created report + actionHistory.trackCreatedReport(routeEvent, report, blobInfo = blobInfo) + azureEventService.trackEvent( + ReportCreatedEvent( + report.id, + queueMessage.topic + ) ) - ) - FHIREngineRunResult( - routeEvent, - report, - blobInfo.blobUrl, - FhirDestinationFilterQueueMessage( - report.id, + FHIREngineRunResult( + routeEvent, + report, blobInfo.blobUrl, - BlobAccess.digestToString(blobInfo.digest), - queueMessage.blobSubFolderName, - queueMessage.topic + FhirDestinationFilterQueueMessage( + report.id, + blobInfo.blobUrl, + BlobAccess.digestToString(blobInfo.digest), + queueMessage.blobSubFolderName, + queueMessage.topic + ) ) - ) - }.collect(Collectors.toList()) + } + }.collect(Collectors.toList()).filterNotNull() } } else { val nextEvent = ProcessEvent( @@ -264,7 +292,7 @@ class FHIRConverter( queueMessage: ReportPipelineMessage, actionLogger: ActionLogger, routeReportWithInvalidItems: Boolean = true, - ): List { + ): List> { val validator = queueMessage.topic.validator val rawReport = queueMessage.downloadContent() return if (rawReport.isBlank()) { @@ -309,7 +337,7 @@ class FHIRConverter( } val areAllItemsParsedAndValid = processedItems.all { it.getError() == null } - val bundles = processedItems.mapNotNull { item -> + val bundles = processedItems.map { item -> val error = item.getError() if (error != null) { actionLogger.getItemLogger(error.index + 1, item.getTrackingId()).error(error) @@ -321,7 +349,7 @@ class FHIRConverter( .warn(this) } } - item.bundle + item } withLoggingContext( diff --git a/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt b/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt index a46ef216ca4..a50740bf6a0 100644 --- a/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/azure/FHIRConverterIntegrationTests.kt @@ -226,7 +226,9 @@ class FHIRConverterIntegrationTests { fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> - val routedReports = fetchChildReports(receiveReport, txn, 2) + val (routedReports, _) = fetchChildReports( + receiveReport, txn, 4 + ).partition { it.nextAction != TaskAction.none } // Verify that the expected FHIR bundles were uploaded val reportAndBundles = routedReports.map { @@ -346,7 +348,9 @@ class FHIRConverterIntegrationTests { fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> - val routedReports = fetchChildReports(receiveReport, txn, 2) + val (routedReports, _) = fetchChildReports( + receiveReport, txn, 4 + ).partition { it.nextAction != TaskAction.none } // Verify that the expected FHIR bundles were uploaded val reportAndBundles = routedReports.map { @@ -425,7 +429,9 @@ class FHIRConverterIntegrationTests { fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter()) ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn -> - val routedReports = fetchChildReports(receiveReport, txn, 1) + val (routedReports, _) = fetchChildReports( + receiveReport, txn, 2 + ).partition { it.nextAction != TaskAction.none } // Verify that the expected FHIR bundles were uploaded val reportAndBundles = routedReports.map { diff --git a/prime-router/src/test/kotlin/fhirengine/engine/FhirConverterTests.kt b/prime-router/src/test/kotlin/fhirengine/engine/FhirConverterTests.kt index 047bd2beead..04aab3778e2 100644 --- a/prime-router/src/test/kotlin/fhirengine/engine/FhirConverterTests.kt +++ b/prime-router/src/test/kotlin/fhirengine/engine/FhirConverterTests.kt @@ -612,8 +612,9 @@ class FhirConverterTests { every { mockMessage.topic } returns Topic.FULL_ELR every { mockMessage.reportId } returns UUID.randomUUID() every { mockMessage.downloadContent() } returns "{\"id\":}" - val bundles = engine.process(MimeFormat.FHIR, mockMessage, actionLogger) - assertThat(bundles).isEmpty() + val processedItems = engine.process(MimeFormat.FHIR, mockMessage, actionLogger) + assertThat(processedItems).hasSize(1) + assertThat(processedItems.first().bundle).isNull() assertThat(actionLogger.errors.map { it.detail.message }).contains( @Suppress("ktlint:standard:max-line-length") "Item 1 in the report was not parseable. Reason: exception while parsing FHIR: HAPI-1861: Failed to parse JSON encoded FHIR content: Unexpected character ('}' (code 125)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [line: 1, column: 7]" @@ -640,8 +641,9 @@ class FhirConverterTests { every { mockMessage.topic } returns Topic.FULL_ELR every { mockMessage.reportId } returns UUID.randomUUID() every { mockMessage.downloadContent() } returns "{\"id\":\"1\", \"resourceType\":\"Bundle\"}" - val bundles = engine.process(MimeFormat.FHIR, mockMessage, actionLogger) - assertThat(bundles).isEmpty() + val processedItems = engine.process(MimeFormat.FHIR, mockMessage, actionLogger) + assertThat(processedItems).hasSize(1) + assertThat(processedItems.first().bundle).isNull() assertThat(actionLogger.errors.map { it.detail.message }).contains( "Item 1 in the report was not valid. Reason: Validation failed" ) @@ -657,8 +659,9 @@ class FhirConverterTests { every { mockMessage.downloadContent() } returns unparseableHL7 - val bundles = engine.process(MimeFormat.HL7, mockMessage, actionLogger) - assertThat(bundles).isEmpty() + val processedItems = engine.process(MimeFormat.HL7, mockMessage, actionLogger) + assertThat(processedItems).hasSize(1) + assertThat(processedItems.first().bundle).isNull() assertThat( actionLogger.errors.map { it.detail.message @@ -690,8 +693,9 @@ class FhirConverterTests { every { mockMessage.downloadContent() } returns simpleHL7 - val bundles = engine.process(MimeFormat.HL7, mockMessage, actionLogger) - assertThat(bundles).isEmpty() + val processedItems = engine.process(MimeFormat.HL7, mockMessage, actionLogger) + assertThat(processedItems).hasSize(1) + assertThat(processedItems.first().bundle).isNull() @Suppress("ktlint:standard:max-line-length") assertThat( actionLogger.errors.map { @@ -717,8 +721,9 @@ class FhirConverterTests { every { mockMessage.downloadContent() } returns simpleHL7 - val bundles = engine.process(MimeFormat.HL7, mockMessage, actionLogger) - assertThat(bundles).isEmpty() + val processedItems = engine.process(MimeFormat.HL7, mockMessage, actionLogger) + assertThat(processedItems).hasSize(1) + assertThat(processedItems.first().bundle).isNull() assertThat( actionLogger.errors.map { it.detail.message @@ -741,8 +746,8 @@ class FhirConverterTests { } returns """{\"id\":} {"id":"1", "resourceType":"Bundle"} """.trimMargin() - val bundles = engine.process(MimeFormat.FHIR, mockMessage, actionLogger) - assertThat(bundles).hasSize(1) + val processedItems = engine.process(MimeFormat.FHIR, mockMessage, actionLogger) + assertThat(processedItems).hasSize(2) assertThat(actionLogger.errors.map { it.detail.message }).contains( @Suppress("ktlint:standard:max-line-length") "Item 1 in the report was not parseable. Reason: exception while parsing FHIR: HAPI-1861: Failed to parse JSON encoded FHIR content: Unexpected character ('\\' (code 92)): was expecting double-quote to start field name\n at [line: 1, column: 2]"