Skip to content

Commit

Permalink
15153: update FHIRConvert to create empty report/lineages for items w…
Browse files Browse the repository at this point in the history
…ith no next action (#15224)

* 15153: update FHIRConvert to create empty report/lineages for items with no next action

* fixup! 15153: update FHIRConvert to create empty report/lineages for items with no next action

* fixup! 15153: update FHIRConvert to create empty report/lineages for items with no next action

* Delete duplicated FHIR Converter integration tests

* fixup! 15153: update FHIRConvert to create empty report/lineages for items with no next action

* Update prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt

Co-authored-by: Angela DeGolier <[email protected]>

---------

Co-authored-by: Angela DeGolier <[email protected]>
  • Loading branch information
2 people authored and jack-h-wang committed Jul 26, 2024
1 parent fe7e76f commit 42c658b
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 71 deletions.
140 changes: 84 additions & 56 deletions prime-router/src/main/kotlin/fhirengine/engine/FHIRConverter.kt
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,13 @@ class FHIRConverter(
// TODO: https://github.com/CDCgov/prime-reportstream/issues/14287
FhirPathUtils

val fhirBundles = process(format, queueMessage, actionLogger)
val processedItems = process(format, queueMessage, actionLogger)

if (fhirBundles.isNotEmpty()) {
// processedItems can be empty in three scenarios:
// - the blob had no contents, i.e. an empty file was submitted
// - the format is HL7 and the contents were not parseable, so the number of items is unknown
// - the format is unexpected like CSV
if (processedItems.isNotEmpty()) {
return LogMeasuredTime.measureAndLogDurationWithReturnedValue(
"Applied sender transform and routed"
) {
Expand All @@ -137,68 +141,92 @@ class FHIRConverter(
)

maybeParallelize(
fhirBundles.size,
Streams.mapWithIndex(fhirBundles.stream()) { bundle, index ->
processedItems.size,
Streams.mapWithIndex(processedItems.stream()) { bundle, index ->
Pair(bundle, index)
},
"Applying sender transforms and routing"
).map { (bundle, bundleIndex) ->
).map { (processedItem, itemIndex) ->
// conduct FHIR Transform
transformer?.process(bundle)

// make a 'report'
val report = Report(
MimeFormat.FHIR,
emptyList(),
parentItemLineageData = listOf(
Report.ParentItemLineageData(queueMessage.reportId, bundleIndex.toInt() + 1)
),
metadata = this.metadata,
topic = queueMessage.topic,
nextAction = TaskAction.destination_filter
)
if (processedItem.bundle == null) {
val report = Report(
MimeFormat.FHIR,
emptyList(),
parentItemLineageData = listOf(
Report.ParentItemLineageData(queueMessage.reportId, itemIndex.toInt() + 1)
),
metadata = this.metadata,
topic = queueMessage.topic,
nextAction = TaskAction.none
)
val noneEvent = ProcessEvent(
Event.EventAction.NONE,
report.id,
Options.None,
emptyMap(),
emptyList()
)
actionHistory.trackCreatedReport(noneEvent, report)
null
} else {
// We know from the null check above that this cannot be null
val bundle = processedItem.bundle!!
transformer?.process(bundle)

// make a 'report'
val report = Report(
MimeFormat.FHIR,
emptyList(),
parentItemLineageData = listOf(
Report.ParentItemLineageData(queueMessage.reportId, itemIndex.toInt() + 1)
),
metadata = this.metadata,
topic = queueMessage.topic,
nextAction = TaskAction.destination_filter
)

// create route event
val routeEvent = ProcessEvent(
Event.EventAction.DESTINATION_FILTER,
report.id,
Options.None,
emptyMap(),
emptyList()
)
// create route event
val routeEvent = ProcessEvent(
Event.EventAction.DESTINATION_FILTER,
report.id,
Options.None,
emptyMap(),
emptyList()
)

// upload to blobstore
val bodyBytes = FhirTranscoder.encode(bundle).toByteArray()
val blobInfo = BlobAccess.uploadBody(
MimeFormat.FHIR,
bodyBytes,
report.id.toString(),
queueMessage.blobSubFolderName,
routeEvent.eventAction
)
// upload to blobstore
val bodyBytes = FhirTranscoder.encode(bundle).toByteArray()
val blobInfo = BlobAccess.uploadBody(
MimeFormat.FHIR,
bodyBytes,
report.id.toString(),
queueMessage.blobSubFolderName,
routeEvent.eventAction
)

// track created report
actionHistory.trackCreatedReport(routeEvent, report, blobInfo = blobInfo)
azureEventService.trackEvent(
ReportCreatedEvent(
report.id,
queueMessage.topic
// track created report
actionHistory.trackCreatedReport(routeEvent, report, blobInfo = blobInfo)
azureEventService.trackEvent(
ReportCreatedEvent(
report.id,
queueMessage.topic
)
)
)

FHIREngineRunResult(
routeEvent,
report,
blobInfo.blobUrl,
FhirDestinationFilterQueueMessage(
report.id,
FHIREngineRunResult(
routeEvent,
report,
blobInfo.blobUrl,
BlobAccess.digestToString(blobInfo.digest),
queueMessage.blobSubFolderName,
queueMessage.topic
FhirDestinationFilterQueueMessage(
report.id,
blobInfo.blobUrl,
BlobAccess.digestToString(blobInfo.digest),
queueMessage.blobSubFolderName,
queueMessage.topic
)
)
)
}.collect(Collectors.toList())
}
}.collect(Collectors.toList()).filterNotNull()
}
} else {
val nextEvent = ProcessEvent(
Expand Down Expand Up @@ -264,7 +292,7 @@ class FHIRConverter(
queueMessage: ReportPipelineMessage,
actionLogger: ActionLogger,
routeReportWithInvalidItems: Boolean = true,
): List<Bundle> {
): List<IProcessedItem<*>> {
val validator = queueMessage.topic.validator
val rawReport = queueMessage.downloadContent()
return if (rawReport.isBlank()) {
Expand Down Expand Up @@ -309,7 +337,7 @@ class FHIRConverter(
}

val areAllItemsParsedAndValid = processedItems.all { it.getError() == null }
val bundles = processedItems.mapNotNull { item ->
val bundles = processedItems.map { item ->
val error = item.getError()
if (error != null) {
actionLogger.getItemLogger(error.index + 1, item.getTrackingId()).error(error)
Expand All @@ -321,7 +349,7 @@ class FHIRConverter(
.warn(this)
}
}
item.bundle
item
}

withLoggingContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ class FHIRConverterIntegrationTests {
fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter())

ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val routedReports = fetchChildReports(receiveReport, txn, 2)
val (routedReports, _) = fetchChildReports(
receiveReport, txn, 4
).partition { it.nextAction != TaskAction.none }
// Verify that the expected FHIR bundles were uploaded
val reportAndBundles =
routedReports.map {
Expand Down Expand Up @@ -346,7 +348,9 @@ class FHIRConverterIntegrationTests {
fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter())

ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val routedReports = fetchChildReports(receiveReport, txn, 2)
val (routedReports, _) = fetchChildReports(
receiveReport, txn, 4
).partition { it.nextAction != TaskAction.none }
// Verify that the expected FHIR bundles were uploaded
val reportAndBundles =
routedReports.map {
Expand Down Expand Up @@ -425,7 +429,9 @@ class FHIRConverterIntegrationTests {
fhirFunctions.doConvert(queueMessage, 1, createFHIRConverter())

ReportStreamTestDatabaseContainer.testDatabaseAccess.transact { txn ->
val routedReports = fetchChildReports(receiveReport, txn, 1)
val (routedReports, _) = fetchChildReports(
receiveReport, txn, 2
).partition { it.nextAction != TaskAction.none }
// Verify that the expected FHIR bundles were uploaded
val reportAndBundles =
routedReports.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,9 @@ class FhirConverterTests {
every { mockMessage.topic } returns Topic.FULL_ELR
every { mockMessage.reportId } returns UUID.randomUUID()
every { mockMessage.downloadContent() } returns "{\"id\":}"
val bundles = engine.process(MimeFormat.FHIR, mockMessage, actionLogger)
assertThat(bundles).isEmpty()
val processedItems = engine.process(MimeFormat.FHIR, mockMessage, actionLogger)
assertThat(processedItems).hasSize(1)
assertThat(processedItems.first().bundle).isNull()
assertThat(actionLogger.errors.map { it.detail.message }).contains(
@Suppress("ktlint:standard:max-line-length")
"Item 1 in the report was not parseable. Reason: exception while parsing FHIR: HAPI-1861: Failed to parse JSON encoded FHIR content: Unexpected character ('}' (code 125)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')\n at [line: 1, column: 7]"
Expand All @@ -640,8 +641,9 @@ class FhirConverterTests {
every { mockMessage.topic } returns Topic.FULL_ELR
every { mockMessage.reportId } returns UUID.randomUUID()
every { mockMessage.downloadContent() } returns "{\"id\":\"1\", \"resourceType\":\"Bundle\"}"
val bundles = engine.process(MimeFormat.FHIR, mockMessage, actionLogger)
assertThat(bundles).isEmpty()
val processedItems = engine.process(MimeFormat.FHIR, mockMessage, actionLogger)
assertThat(processedItems).hasSize(1)
assertThat(processedItems.first().bundle).isNull()
assertThat(actionLogger.errors.map { it.detail.message }).contains(
"Item 1 in the report was not valid. Reason: Validation failed"
)
Expand All @@ -657,8 +659,9 @@ class FhirConverterTests {
every {
mockMessage.downloadContent()
} returns unparseableHL7
val bundles = engine.process(MimeFormat.HL7, mockMessage, actionLogger)
assertThat(bundles).isEmpty()
val processedItems = engine.process(MimeFormat.HL7, mockMessage, actionLogger)
assertThat(processedItems).hasSize(1)
assertThat(processedItems.first().bundle).isNull()
assertThat(
actionLogger.errors.map {
it.detail.message
Expand Down Expand Up @@ -690,8 +693,9 @@ class FhirConverterTests {
every {
mockMessage.downloadContent()
} returns simpleHL7
val bundles = engine.process(MimeFormat.HL7, mockMessage, actionLogger)
assertThat(bundles).isEmpty()
val processedItems = engine.process(MimeFormat.HL7, mockMessage, actionLogger)
assertThat(processedItems).hasSize(1)
assertThat(processedItems.first().bundle).isNull()
@Suppress("ktlint:standard:max-line-length")
assertThat(
actionLogger.errors.map {
Expand All @@ -717,8 +721,9 @@ class FhirConverterTests {
every {
mockMessage.downloadContent()
} returns simpleHL7
val bundles = engine.process(MimeFormat.HL7, mockMessage, actionLogger)
assertThat(bundles).isEmpty()
val processedItems = engine.process(MimeFormat.HL7, mockMessage, actionLogger)
assertThat(processedItems).hasSize(1)
assertThat(processedItems.first().bundle).isNull()
assertThat(
actionLogger.errors.map {
it.detail.message
Expand All @@ -741,8 +746,8 @@ class FhirConverterTests {
} returns """{\"id\":}
{"id":"1", "resourceType":"Bundle"}
""".trimMargin()
val bundles = engine.process(MimeFormat.FHIR, mockMessage, actionLogger)
assertThat(bundles).hasSize(1)
val processedItems = engine.process(MimeFormat.FHIR, mockMessage, actionLogger)
assertThat(processedItems).hasSize(2)
assertThat(actionLogger.errors.map { it.detail.message }).contains(
@Suppress("ktlint:standard:max-line-length")
"Item 1 in the report was not parseable. Reason: exception while parsing FHIR: HAPI-1861: Failed to parse JSON encoded FHIR content: Unexpected character ('\\' (code 92)): was expecting double-quote to start field name\n at [line: 1, column: 2]"
Expand Down

0 comments on commit 42c658b

Please sign in to comment.