OMOP Odyssey - AWS HealthLake (Strait of Messina)
Welcome to the Strait of Messina. In this leg of the Odyssey, we’re navigating a complex channel between two powerful cloud entities: AWS HealthLake and the InterSystems OMOP Cloud Service. Our goal is to automate the flow of FHIR data from HealthLake into our analytical OMOP CDM.
The Challenge: Bridging the Cloud
AWS HealthLake is a HIPAA-eligible service that stores and transforms healthcare data into a searchable FHIR format. To get this data into our OMOP CDM, we need a reliable, automated way to export from HealthLake and ingest into InterSystems.
.png)
Step One: AWS HealthLake Setup
First, we ensure our HealthLake data store is populated and ready for export.
.png)
Step Two: Automating the Export with Lambda
We use an AWS Lambda function to trigger the export of FHIR resources from HealthLake to an S3 bucket. This bucket acts as the staging area for our InterSystems ingestion.
.png)
import json
import boto3
import uuid
import zipfile
import io
import os
import time
def lambda_handler(event, context):
# Botos
s3 = boto3.client('s3')
client = boto3.client('healthlake')
# Vars
small_guid = uuid.uuid4().hex[:8]
bucket_name = 'intersystems-omop-fhir-bucket'
prefix = 'export/' # Make sure it ends with '/'
output_zip_key = 'from-healthlake-to-intersystems-omop/healthlake_ndjson_' + small_guid + '.zip'
datastore_id = '9ee0e51d987e#ai#8ca487e8e95b1d'
# Step 1: Start the HealthLake export job
response = client.start_fhir_export_job(
JobName='FHIR2OMOPJob',
OutputDataConfig={
'S3Configuration': {
'S3Uri': 's3://intersystems-omop-fhir-bucket/export/',
'KmsKeyId': 'arn:aws:kms:us-east-2:12345:key/54918bec-#ai#-4710-9c18-1a65d0d4590b'
}
},
DatastoreId=datastore_id,
DataAccessRoleArn='arn:aws:iam::12345:role/service-role/AWSHealthLake-Export-2-OMOP',
ClientToken=small_guid
)
job_id = response['JobId']
print(f"Export job started: {job_id}")
# Step 2: Poll until the job completes
while True:
status_response = client.describe_fhir_export_job(
DatastoreId=datastore_id,
JobId=job_id
)
status = status_response['ExportJobProperties']['JobStatus']
print(f"Job status: {status}")
if status in ['COMPLETED', 'FAILED', 'CANCELLED']:
break
time.sleep(10) # wait before polling again
# Step 3: If completed, zip the files and upload
if status == 'COMPLETED':
output_uri = status_response['ExportJobProperties']['OutputDataConfig']['S3Configuration']['S3Uri']
print(f"Export completed. Data available at: {output_uri}")
# Get list of all objects with .ndjson extension under the prefix
ndjson_keys = []
paginator = s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
for obj in page.get('Contents', []):
key = obj['Key']
if key.endswith('.ndjson'):
ndjson_keys.append(key)
# Create ZIP in memory
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for key in ndjson_keys:
obj = s3.get_object(Bucket=bucket_name, Key=key)
file_data = obj['Body'].read()
arcname = os.path.basename(key)
zf.writestr(arcname, file_data)
zip_buffer.seek(0)
# Upload ZIP back to S3 for InterSystems Ingestion
s3.put_object(
Bucket=bucket_name,
Key=output_zip_key,
Body=zip_buffer.getvalue()
)
print(f"Created ZIP with {len(ndjson_keys)} files at s3://{bucket_name}/{output_zip_key}")
# Clean up landing area
paginator = s3.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket_name, Prefix=prefix)
for page in pages:
if 'Contents' in page:
delete_keys = [
{'Key': obj['Key']}
for obj in page['Contents']
if obj['Key'] != prefix
]
if delete_keys:
s3.delete_objects(Bucket=bucket_name, Delete={'Objects': delete_keys})
print(f"Deleted {len(delete_keys)} objects under {prefix}")
else:
print(f"Export job did not complete successfully. Status: {status}")
return {
'statusCode': 200,
'body': json.dumps(response)
}
Step Three: Ingestion into InterSystems OMOP
The InterSystems OMOP Cloud Service is configured to monitor our staging S3 bucket. As soon as HealthLake completes its export, InterSystems picks up the files and begins the transformation.
.png)
Step Four: Validation
We can verify that the data is flowing correctly by checking the ingestion logs in the InterSystems console.
.png)
Step Five: Smokin’ Data!
Finally, we can verify that our FHIR resources (like Organizations) have been correctly mapped to OMOP tables (like care_site).
.png)
The traversal of the Strait of Messina is a success. We now have a fully automated pipeline from a native cloud FHIR store to a research-ready OMOP CDM.
Stay tuned for the next stage of the OMOP Odyssey.