This is an old revision of the document!
Turning Datasets into RKNS Format
This guide describes how to convert a structured dataset (e.g., BIDS) into the RKNS format using the airflow-template-dataset-to-rkns template repository. The process is divided into three main phases:
- Transform Logic – Implement the conversion from raw data to RKNS (via
main.py). - Containerization – Package the logic into a portable Docker image (
build.sh+Dockerfile). - Orchestration – Run the transformation at scale using Apache Airflow.
Below, we detail each phase with practical guidance.
1. Transform Logic (main.py)
The core conversion logic lives in main.py. It reads:
- An EDF file (physiological signals),
- A TSV annotation file (onset, duration, event),
- A participants.tsv metadata table (subject-level data),
- A participants.json column dictionary (metadata codebook),
and outputs a validated *.rkns* file (a Zarr zip store).
CLI Interface
It is recommended to keep the CLI interface unchanged if possible—it is designed for easier Airflow usage:
python main.py \ --input-edf <path> # EDF file with signal data --input-annotations <path> # TSV with [onset, duration, event] --input-participants <path> # participants.tsv with subject metadata --input-participants-json <path> # participants.json codebook --output-dir <path> # Directory for output .rkns file [--create-dirs] # Optionally create output dir if missing
Conversion Workflow
The edf_to_rkns() function executes the following steps:
1.1 Load and Standardize Signals
rkns_obj = rkns.from_external_format( input_file_edf, channel_mapping=replacement_dict, # From assets/replace_channels.json exclude_channels=list(exclude_channels) )
Uses regex mappings in assets/replace_channels.json to rename EDF channels to standardized names (e.g., “EEG(sec)” → “EEG-C3-A2”).
1.2 Add Annotations
Converts the TSV file to BIDS-compatible format and adds events:
rkns.io.csv_onsetduration_to_bids(input_file_tsv, ...) rkns_obj.add_source_annotations( file_path=tmp_path, event_description_mapping=event_description_mapping # From assets/event_description_mapping.json )
1.3 Extract and Categorize Metadata
- Matches participant ID (e.g., “sub-0001”) from the EDF filename.
- Looks up the participant in
participants.tsv. - Uses
participants.jsonto map columns to RKNS metadata categories (e.g., “demographics”, “clinical”). - Groups metadata by category and adds each to the RKNS object.
1.4 Finalize and Export
rkns_obj.populate() # Build internal structure rkns_obj.export_as_zip(output_file) # Write Zarr zip store
1.5 Validate
validate_rkns_checksum(output_file) # Verify checksums on all data
2. Preprocessing: Generate RKNS-Compatible Event Annotations
RKNS requires events in a strict tab-separated (TSV) format with three columns: onset, duration, and event.
However, source annotations are often in XML, proprietary formats, or non-compliant TSVs (e.g., using absolute timestamps or missing durations).
You must write a preprocessing script to convert these into the required format:
onset duration event 0.0 30.0 stage_AASM_e1_W 30.0 30.0 stage_AASM_e1_N1 60.0 30.0 stage_AASM_e1_N2 ...
Key Requirements
- onset: Time in seconds relative to EDF start (not absolute wall-clock time).
If your source uses absolute time, align it with the EDF's recording start time.
- duration: Event length in seconds.
If only onset and end are provided, compute duration = end - onset.
- event: Keep the original event label as-is—do not normalize it here.
Normalization happens later via assets/event_description_mapping.json.
Recommendation
Write this as a standalone, reusable Python script (e.g., convert_annotations_to_rkns_tsv.py). Keep it in your derived repo for reproducibility. Avoid bash—use uv for inline dependencies if needed.
3. Standardizing Names via Regex Mappings
Once you have a compliant TSV, normalize channel and event names non-destructively using regex mappings defined in two JSON files.
Channel Names → assets/replace_channels.json
EDF channel labels vary wildly (e.g., “EEG C3-A2”, “EEG(sec)”). Map them to standardized RKNS names:
{
"(?i)^EEG\\(sec\\)\\s*$": "EEG-C3-A2",
"(?i)^SaO2\\s*$": "SPO2",
"(?i)^ABDO\\sRES\\s*$": "RESP-ABD",
"(?i)^THOR\\sRES\\s*$": "RESP-CHEST",
"_": "-"
}
Keys are regex patterns (applied sequentially), and values are replacement strings. Groups in the pattern can be referenced in the replacement (e.g., \1).
Validation
Use test_replace_channels.py to validate your mappings:
1. Extract unique channel names from your EDF files:
find /data -name "*.edf" -exec edf-peek {} \; | grep "signal_labels" | sort | uniq > assets/extracted_channels.txt
2. Run the test:
python test_replace_channels.py
3. Inspect the output:
cat out/renamed_channels.csv
AI-Assisted Mapping Generation
If you have a list of unique channel names, use this prompt with your LLM to accelerate mapping creation:
I will provide you a list of EEG channels extracted from the original EDFs.
I require an output in JSON format that maps channel names to new standardized names using grep-style regex replacements.
For example, this is a reference mapping. Note that replacements are applied in sequential order:
> { > "(?i)^ABDO\\sRES\\s*$": "RESP-ABD", > "(?i)^THOR\\sRES\\s*$": "RESP-CHEST", > "(?i)^EEG\\(sec\\)\\s*$": "EEG-C3-A2", > "_": "-" > } >
The keys are regex patterns (case-insensitive), and the values are replacement strings. Groups in the pattern can be referenced in the replacement (e.g.,\1).
Based on the above example, generate a similar JSON mapping for the following list of channel names:
```
[INSERT YOUR UNIQUE CHANNEL LIST HERE]
```
Provide the output within a JSON code block.
Event Descriptions → assets/event_description_mapping.json
Normalize inconsistent event labels (e.g., “stage_AASM_e1_W” → “sleep_stage_wake”):
{
"(?i)^stage_AASM_e1_W$": "sleep_stage_wake",
"(?i)^stage_AASM_e1_N1$": "sleep_stage_n1",
"(?i)^stage_AASM_e1_N2$": "sleep_stage_n2",
"(?i)^stage_AASM_e1_N3$": "sleep_stage_n3",
"(?i)^stage_AASM_e1_R$": "sleep_stage_rem",
"^\\s*(?!sleep_stage_|arousal)\"?([^\"]+)\"?\\s*$": "[comment] \\1"
}
The last pattern acts as a catch-all: unknown events are prefixed with [comment] to prevent validation errors.
Validation
Use test_replace_events.py to validate your mappings:
1. Extract unique event names from your annotations:
tail -n +2 /data/**/*_events.tsv | cut -f3 | sort | uniq > assets/extracted_annotation_events.txt
2. Run the test:
python test_replace_events.py
3. Inspect the output:
cat out/renamed_events.csv
AI-Assisted Mapping Generation
Use this prompt with your LLM to accelerate mapping creation:
I will provide you a list of sleep events extracted from original annotation files.
I require an output in JSON format that maps event names to standardized names using grep-style regex replacements.
For example, this is a reference mapping. Note that replacements are applied in sequential order:
> { > "(?i)^stage_AASM_e1_W$": "sleep_stage_wake", > "(?i)^stage_AASM_e1_R$": "sleep_stage_rem", > "(?i)^arousals_e1_\\('arousal_standard',\\s*'EEG_C3'\\)$": "arousal_eeg_c3", > "^\\s*(?!sleep_stage_|arousal|apnea|hypopnea|desaturation|artifact|body_position)\"?([^\"]+)\"?\\s*$": "[comment] \\1" > } >
The keys are regex patterns (case-insensitive), and the values are replacement strings. Groups in the pattern can be referenced in the replacement (e.g.,\1).
Based on the above example, generate a similar JSON mapping for the following list of event names:
```
[INSERT YOUR UNIQUE EVENT LIST HERE]
```
Provide the output within a JSON code block.
4. Metadata Handling
RKNS groups metadata by high-level categories (e.g., demographics, clinical, questionnaires) to organize heterogeneous data sources into a consistent internal structure.
To enable this, your participants.json must include a “folder” key for each column, using the original NSRR folder path (or your dataset's metadata taxonomy) as the value:
{
"age": {
"Description": "Age of participant at baseline",
"folder": "Harmonized/Demographics"
},
"BMI": {
"Description": "Body mass index",
"folder": "Anthropometry"
},
"PHQ_9_total": {
"Description": "Patient Health Questionnaire total score",
"folder": "Sleep Questionnaires/Sleep Disturbance"
}
}
Category Mapping
The script uses the category_mapping dictionary (defined at the top of main.py) to automatically map folder paths to one of these standardized categories:
administrative– Consent, study center, visit infoanthropometry– Height, weight, BMIclinical– Physical exam, vital signsdemographics– Age, sex, race, ethnicitygeneral_health– Quality of life, self-reported healthlifestyle_and_behavioral_health– Diet, substance use, physical activity, mental healthmedical_history– Medications, comorbidities, surgical historyquestionnaires– Survey-based instruments (PHQ-9, GAD-7, ISI, etc.)sleep_monitoring– Polysomnography, actigraphy, oximetry, respiratory eventstreatment– CPAP, therapy, adherence
How It Works
1. The script extracts the participant ID from the EDF filename (e.g., "sub-0001" from "sub-0001_task-sleep_eeg.edf"). 2. It finds the matching row in ''participants.tsv''. 3. For each column in that row, it looks up the column name in ''participants.json'' to find its ''folder''. 4. It maps the ''folder'' to a standardized category using ''category_mapping''. 5. Metadata is grouped by category and added to the RKNS object.
This abstraction allows RKNS to handle diverse datasets (NSRR, custom studies, etc.) while maintaining a uniform schema for downstream analysis.
5. CLI: Python & Docker
Testing the Development CLI
Once you've implemented your conversion logic in main.py, test it end-to-end with your actual data:
python main.py \ --input-edf /path/to/sub-0001_task-sleep_eeg.edf \ --input-annotations /path/to/sub-0001_task-sleep_eeg_events.tsv \ --input-participants /path/to/participants.tsv \ --input-participants-json /path/to/participants.json \ --output-dir /path/to/output \ --create-dirs
The script will:
1. Load the EDF and apply channel mappings. 2. Load annotations and apply event mappings. 3. Extract participant metadata and group by category. 4. Validate checksums on all written data. 5. Output a ''.rkns'' file with 777 permissions.
Building and Testing the Docker Image
Build the Docker image:
chmod +x build.sh ./build.sh
This creates an image with the same CLI interface. Run it with:
docker run --rm \ -v /path/to/data:/data \ -v /path/to/output:/output \ your-registry/your-image:tag \ python main.py \ --input-edf /data/sub-0001_task-sleep_eeg.edf \ --input-annotations /data/sub-0001_task-sleep_eeg_events.tsv \ --input-participants /data/participants.tsv \ --input-participants-json /data/participants.json \ --output-dir /output \ --create-dirs
Security Note: Build args are visible in intermediate layer history. Avoid storing secrets (API keys, credentials) as build args; use runtime environment variables or mount secrets instead.
Testing the Docker-CLI
Build with ./build.sh (credentials stay in builder stage) and try running it on the example data with the same arguments as run_example.sh.
Security: Build args are visible in intermediate layer history—avoid public logs.
6. Orchestration: Airflow DAG
The provided edf_migration_dag.py serves as a template for processing all EDF files in your dataset at scale using Apache Airflow.
DAG Overview
The DAG:
1. Discovers all EDF files in a base directory. 2. For each EDF, constructs paths to its corresponding annotation, participants, and metadata files. 3. Creates a task that runs the Docker container with the appropriate arguments. 4. Collects results and validates completion.
Required Adaptations
a) Input Dataset Path
Update base_path to point to your root BIDS-like directory:
base_path = "/your/data/root" # e.g., "/data/shhs-bids" edf_files = find_edf_files(base_path)
b) Annotation File Naming Logic
The DAG assumes each EDF file has a corresponding annotation file. The default logic replaces .edf with _events.tsv:
tsv_file = edf_file.replace('.edf', '_events.tsv')
If your annotation files:
- Use a different naming pattern, update the derivation:
# Example: annotations in a separate 'annotations/' subfolder tsv_file = edf_file.replace('/eeg/', '/annotations/').replace('.edf', '.tsv')
- Have inconsistent naming, implement a lookup function:
def find_annotation_file(edf_file: str, base_path: str) -> str: """Derive annotation file path from EDF file.""" # Implement your custom logic here pass
c) Metadata File Paths
The DAG uses global participants.tsv and participants.json files. Ensure they exist at the repository root:
participants_tsv = f"{base_path}/participants.tsv" participants_json = f"{base_path}/participants.json"
If your dataset uses per-subject or per-session metadata:
- Derive paths dynamically from the EDF filename.
- Validate that files exist before passing to the Docker task.
d) Docker Image Name
Update the image parameter to match your built container:
image='your-registry/your-image:tag' # Must match: docker build -t your-registry/your-image:tag .
e) Output Directory
The output path specifies where .rkns files are written:
output_dir = '/storage/rekonas-dataset-output'
Ensure:
- The path exists on the Airflow worker node (or use
–create-dirs). - The Airflow worker has write permissions.
- Sufficient disk space is available.
f) Volume Mounts
The DAG binds host directories into the container. Update both the host path and container mount point:
mounts=[{ 'source': '/absolute/path/on/host/data', # Your actual data directory 'target': '/data', # Path inside container 'type': 'bind' }]
Important: All –input-* and –output-dir arguments in the task command must use container paths (the target), not host paths.
Example:
command=[ 'python', 'main.py', '--input-edf', '/data/sub-0001_eeg.edf', # Container path '--input-annotations', '/data/sub-0001_events.tsv', '--input-participants', '/data/participants.tsv', '--input-participants-json', '/data/participants.json', '--output-dir', '/data/output', '--create-dirs' ]
g) Example: Full Adaptation
Here's a complete example for a BIDS dataset stored at /mnt/bids:
from airflow import DAG from airflow.providers.docker.operators.docker import DockerOperator from datetime import datetime import glob base_path = "/mnt/bids" output_dir = "/mnt/bids/derivatives/rkns" docker_image = "my-org/edf-to-rkns:v1.0" # Find all EDF files edf_files = sorted(glob.glob(f"{base_path}/**/*_eeg.edf", recursive=True)) with DAG('edf_to_rkns_pipeline', start_date=datetime(2024, 1, 1), schedule_interval=None) as dag: for edf_file in edf_files: tsv_file = edf_file.replace('_eeg.edf', '_eeg.tsv') task = DockerOperator( task_id=f"convert_{Path(edf_file).stem}", image=docker_image, command=[ 'python', 'main.py', '--input-edf', edf_file, '--input-annotations', tsv_file, '--input-participants', f'{base_path}/participants.tsv', '--input-participants-json', f'{base_path}/participants.json', '--output-dir', output_dir, '--create-dirs' ], mounts=[{ 'source': base_path, 'target': base_path, 'type': 'bind' }], dag=dag )
7. Project Structure
.
├── README.md # This file
├── main.py # Core conversion logic (CLI entry point)
├── build.sh # Docker build script
├── Dockerfile # Container specification
├── pyproject.toml # Python dependencies
├── edf_migration_dag.py # Airflow DAG template
├── .devcontainer/ # Dev container config
│ ├── devcontainer.json
│ └── postStartCommand.sh
├── assets/ # Mapping files and examples
│ ├── replace_channels.json # Channel name mappings
│ ├── event_description_mapping.json # Event label mappings
│ ├── extracted_channels.txt # (Generated) Raw channel names
│ └── extracted_annotation_events.txt # (Generated) Raw event labels
├── test_replace_channels.py # Validation script for channel mappings
├── test_replace_events.py # Validation script for event mappings
└── scripts/
└── convert_annotations_to_rkns_tsv.py # (To implement) Preprocessing script
8. Workflow Summary
End-to-End Process
1. **Prepare Annotations** - Convert your source annotations (XML, proprietary, etc.) to RKNS-compatible TSV format. - Run ''convert_annotations_to_rkns_tsv.py'' on your raw data. 2. **Create Mappings** - Extract unique channel names and event labels from your data. - Use ''test_replace_channels.py'' and ''test_replace_events.py'' to validate mappings. - Refine ''assets/replace_channels.json'' and ''assets/event_description_mapping.json'' iteratively. 3. **Prepare Metadata** - Ensure ''participants.tsv'' contains all subject-level metadata. - Create or obtain ''participants.json'' with folder paths for each column. - Verify that the ''category_mapping'' in ''main.py'' covers all your folders. 4. **Test Locally** - Run ''python main.py --input-edf ... --output-dir ... --create-dirs'' on a sample file. - Verify the output ''.rkns'' file is valid and contains expected data. 5. **Build Docker Image** - Run ''./build.sh'' to create a portable container. - Test the container on the same sample file. 6. **Configure Airflow** - Adapt ''edf_migration_dag.py'' for your data layout. - Deploy to your Airflow instance. - Trigger the DAG to process all files. 7. **Monitor and Validate** - Check Airflow logs for each task. - Verify output ''.rkns'' files are written with correct permissions. - Spot-check metadata categorization and event labels.
9. Troubleshooting
EDF File Not Found
- Verify the path in
–input-edfis correct and exists. - If using Docker, ensure the path is relative to the container mount point (not the host).
Annotations File Not Found
- Check that the annotation file naming logic matches your dataset.
- Ensure the TSV is in RKNS-compatible format (three columns: onset, duration, event).
Participant Not Found in participants.tsv
- The participant ID extraction regex may not match your filename pattern.
- Update
extract_sub_part()inmain.pyto match your naming convention:
def extract_sub_part(filepath): stem = Path(filepath).stem match = re.search(r'sub-[a-zA-Z]{4}\d+', stem) # Adjust regex here if not match: raise ValueError(f"Could not extract subject id from {filepath}") return match.group(0)
Metadata Columns Not Recognized
- Verify that
participants.jsoncontains a“folder”key for each column. - Check that the
foldervalues are incategory_mapping. - Add missing mappings to
category_mappingas needed.
Channel or Event Names Not Mapped
- Extract raw names and add them to the regex mapping JSON.
- Test with
test_replace_channels.pyortest_replace_events.py. - Use LLM prompts to generate regex patterns if needed.
Permission Denied on Output File
- Ensure the output directory is writable by the process (or Airflow worker).
- Use
–create-dirsto auto-create the directory with correct permissions.
Docker Build Fails
- Check that all dependencies in
pyproject.tomlare compatible. - Verify the
Dockerfilereferences the correct base image and Python version. - Check
build.shfor any environment variable or credential issues.
10. Key Files Reference
main.py
edf_to_rkns()– Main conversion function. Orchestrates all steps.validate_rkns_checksum()– Validates data integrity by reading all signal blocks.extract_sub_part()– Extracts participant ID from EDF filename. Customize for your naming convention.category_mapping– Dictionary mapping folder paths to standardized categories. Extend if your dataset uses different paths.parse_args()– CLI argument parser. Do not modify.
assets/replace_channels.json
- Regex patterns (keys) → standardized channel names (values).
- Applied sequentially in order.
- Examples: “EEG(sec)” → “EEG-C3-A2”, “SaO2” → “SPO2”.
assets/event_description_mapping.json
- Regex patterns (keys) → standardized event labels (values).
- Applied sequentially in order.
- Catch-all pattern: unknown events prefixed with
[comment].
participants.json
- Column codebook with
Descriptionand requiredfolderkeys. - Folder values are mapped to categories by
category_mappinginmain.py.
participants.tsv
- Subject-level metadata table (BIDS-compatible).
- Rows = subjects, columns = variables (must match
participants.json). - First column should be
participant_id(e.g., “sub-0001”).
11. Contributing & Customization
This template is intentionally configurable. The main customization points are:
1. **''extract_sub_part()'' in main.py** – Update the regex if your participant ID format differs. 2. **''category_mapping'' in main.py** – Add or modify mappings if your dataset uses different folder paths. 3. **''assets/replace_channels.json''** – Adapt channel mappings to your EDF sources. 4. **''assets/event_description_mapping.json''** – Adapt event mappings to your annotation sources. 5. **''edf_migration_dag.py''** – Adjust discovery logic, paths, and Docker config for your environment.
For questions or issues, refer to the troubleshooting section or the inline code comments in main.py.
Note: To start a new dataset conversion project, create a new repository using airflow-template-dataset-to-rkns as a template. This provides access to internal Rekonas packages and a foundational workflow.