This guide describes how to convert a structured dataset (e.g., BIDS) into the RKNS format using the airflow-template-dataset-to-rkns template repository. The process is divided into three main phases:
main.py).build.sh + Dockerfile).Below, we detail each phase with practical guidance.
The core conversion logic lives in main.py. It reads:
and outputs a validated *.rkns* file (a Zarr zip store).
It is recommended to keep the CLI interface unchanged if possible—it is designed for easier Airflow usage:
python main.py \ --input-edf <path> # EDF file with signal data --input-annotations <path> # TSV with [onset, duration, event] --input-participants <path> # participants.tsv with subject metadata --input-participants-json <path> # participants.json codebook --output-dir <path> # Directory for output .rkns file [--create-dirs] # Optionally create output dir if missing
The edf_to_rkns() function executes the following steps:
rkns_obj = rkns.from_external_format( input_file_edf, channel_mapping=replacement_dict, # From assets/replace_channels.json exclude_channels=list(exclude_channels) )
Uses regex mappings in assets/replace_channels.json to rename EDF channels to standardized names (e.g., “EEG(sec)” → “EEG-C3-A2”).
Converts the TSV file to BIDS-compatible format and adds events:
rkns.io.csv_onsetduration_to_bids(input_file_tsv, ...) rkns_obj.add_source_annotations( file_path=tmp_path, event_description_mapping=event_description_mapping # From assets/event_description_mapping.json )
participants.tsv.participants.json to map columns to RKNS metadata categories (e.g., “demographics”, “clinical”).rkns_obj.populate() # Build internal structure rkns_obj.export_as_zip(output_file) # Write Zarr zip store
validate_rkns_checksum(output_file) # Verify checksums on all data
RKNS requires events in a strict tab-separated (TSV) format with three columns: onset, duration, and event.
However, source annotations are often in XML, proprietary formats, or non-compliant TSVs (e.g., using absolute timestamps or missing durations).
You must write a preprocessing script to convert these into the required format:
onset duration event 0.0 30.0 stage_AASM_e1_W 30.0 30.0 stage_AASM_e1_N1 60.0 30.0 stage_AASM_e1_N2 ...
If your source uses absolute time, align it with the EDF's recording start time.
If only onset and end are provided, compute duration = end - onset.
Normalization happens later via assets/event_description_mapping.json.
Write this as a standalone, reusable Python script (e.g., convert_annotations_to_rkns_tsv.py). Keep it in your derived repo for reproducibility. Avoid bash—use uv for inline dependencies if needed.
Once you have a compliant TSV, normalize channel and event names non-destructively using regex mappings defined in two JSON files.
EDF channel labels vary wildly (e.g., “EEG C3-A2”, “EEG(sec)”). Map them to standardized RKNS names:
{
"(?i)^EEG\\(sec\\)\\s*$": "EEG-C3-A2",
"(?i)^SaO2\\s*$": "SPO2",
"(?i)^ABDO\\sRES\\s*$": "RESP-ABD",
"(?i)^THOR\\sRES\\s*$": "RESP-CHEST",
"_": "-"
}
Keys are regex patterns (applied sequentially), and values are replacement strings. Groups in the pattern can be referenced in the replacement (e.g., \1).
Use test_replace_channels.py to validate your mappings:
1. Extract unique channel names from your EDF files:
find /data -name "*.edf" -exec edf-peek {} \; | grep "signal_labels" | sort | uniq > assets/extracted_channels.txt
2. Run the test:
python test_replace_channels.py
3. Inspect the output:
cat out/renamed_channels.csv
If you have a list of unique channel names, use this prompt with your LLM to accelerate mapping creation:
I will provide you a list of physiological signal channels (e.g., EEG, EOG, EMG, respiratory, cardiac) extracted from original EDF files.
I require an output in JSON format that maps each raw channel name to a standardized name using **sequential, grep-style regex replacements**.
Key requirements:
1. **Include early normalization rules** to handle common delimiters (e.g., `_`, spaces, `.`, `/`, parentheses) by converting them to hyphens (`-`), collapsing multiple hyphens, and trimming leading/trailing hyphens.
2. All patterns must be **case-insensitive** (use `(?i)`).
3. Use **physiologically meaningful, NSRR/AASM-aligned names**, such as:
- `EEG-C3-M2` (not `EEG-C3_A2` or ambiguous forms)
- `EMG-LLEG` / `EMG-RLEG` for leg EMG (not `LAT`/`RAT` as position)
- `RESP-AIRFLOW-THERM` or `RESP-AIRFLOW-PRES` (not generic `RESP-NASAL`)
- `EOG-LOC` / `EOG-ROC` for eye channels
- `EMG-CHIN` for chin EMG
- `PULSE` for heart rate or pulse signals (unless raw ECG → `ECG`)
4. **Do not include a final catch-all rule** (e.g., `^(.+)$ → MISC-\1`) unless explicitly requested—most channels in the input list should be known and mapped specifically.
5. Replacements are applied **in order**, with each rule operating on the result of the previous one.
Example reference snippet:
```json
{
"(?i)[\\s_\\./\\(\\),]+": "-",
"-+": "-",
"^-|-$": "",
"(?i)^abdomen$": "RESP-ABD",
"(?i)^c3_m2$": "EEG-C3-M2",
"(?i)^lat$": "EMG-LLEG"
}
```
Now, generate a similar JSON mapping for the following list of channel names:
```
[INSERT YOUR UNIQUE CHANNEL LIST HERE]
```
Provide the output **within a JSON code block only**—no explanations.
Normalize inconsistent event labels (e.g., “stage_AASM_e1_W” → “sleep_stage_wake”):
{
"(?i)^stage_AASM_e1_W$": "sleep_stage_wake",
"(?i)^stage_AASM_e1_N1$": "sleep_stage_n1",
"(?i)^stage_AASM_e1_N2$": "sleep_stage_n2",
"(?i)^stage_AASM_e1_N3$": "sleep_stage_n3",
"(?i)^stage_AASM_e1_R$": "sleep_stage_rem",
"^\\s*(?!sleep_stage_|arousal)\"?([^\"]+)\"?\\s*$": "[comment] \\1"
}
The last pattern acts as a catch-all: unknown events are prefixed with [comment] to prevent validation errors.
Use test_replace_events.py to validate your mappings:
1. Extract unique event names from your annotations:
tail -n +2 /data/**/*_events.tsv | cut -f3 | sort | uniq > assets/extracted_annotation_events.txt
2. Run the test:
python test_replace_events.py
3. Inspect the output:
cat out/renamed_events.csv
Use this prompt with your LLM to accelerate mapping creation:
I will provide you a list of sleep events extracted from original annotation files.
I require an output in JSON format that maps event names to standardized names using grep-style regex replacements.
For example, this is a reference mapping. Note that replacements are applied in sequential order:
```json
{
"(?i)^stage_AASM_e1_W$": "sleep_stage_wake",
"(?i)^stage_AASM_e1_R$": "sleep_stage_rem",
"(?i)^arousals_e1_\\('arousal_standard',\\s*'EEG_C3'\\)$": "arousal_eeg_c3",
"^\\s*(?!sleep_stage_|arousal|apnea|hypopnea|desaturation|artifact|body_position)\"?([^\"]+)\"?\\s*$": "[comment] \\1"
}
```
The keys are regex patterns (case-insensitive), and the values are replacement strings. Groups in the pattern can be referenced in the replacement (e.g., ''\1'').
Based on the above example, generate a similar JSON mapping for the following list of event names:
```
[INSERT YOUR UNIQUE EVENT LIST HERE]
```
Provide the output within a JSON code block.
RKNS groups metadata by high-level categories (e.g., demographics, clinical, questionnaires) to organize heterogeneous data sources into a consistent internal structure.
To enable this, your participants.json must include a “folder” key for each column, using the original NSRR folder path (or your dataset's metadata taxonomy) as the value:
{
"age": {
"Description": "Age of participant at baseline",
"folder": "Harmonized/Demographics"
},
"BMI": {
"Description": "Body mass index",
"folder": "Anthropometry"
},
"PHQ_9_total": {
"Description": "Patient Health Questionnaire total score",
"folder": "Sleep Questionnaires/Sleep Disturbance"
}
}
The script uses the category_mapping dictionary (defined at the top of main.py) to automatically map folder paths to one of these standardized categories:
administrative – Consent, study center, visit infoanthropometry – Height, weight, BMIclinical – Physical exam, vital signsdemographics – Age, sex, race, ethnicitygeneral_health – Quality of life, self-reported healthlifestyle_and_behavioral_health – Diet, substance use, physical activity, mental healthmedical_history – Medications, comorbidities, surgical historyquestionnaires – Survey-based instruments (PHQ-9, GAD-7, ISI, etc.)sleep_monitoring – Polysomnography, actigraphy, oximetry, respiratory eventstreatment – CPAP, therapy, adherence1. The script extracts the participant ID from the EDF filename (e.g., "sub-0001" from "sub-0001_task-sleep_eeg.edf"). 2. It finds the matching row in ''participants.tsv''. 3. For each column in that row, it looks up the column name in ''participants.json'' to find its ''folder''. 4. It maps the ''folder'' to a standardized category using ''category_mapping''. 5. Metadata is grouped by category and added to the RKNS object.
This abstraction allows RKNS to handle diverse datasets (NSRR, custom studies, etc.) while maintaining a uniform schema for downstream analysis.
Once you've implemented your conversion logic in main.py, test it end-to-end with your actual data:
python main.py \ --input-edf /path/to/sub-0001_task-sleep_eeg.edf \ --input-annotations /path/to/sub-0001_task-sleep_eeg_events.tsv \ --input-participants /path/to/participants.tsv \ --input-participants-json /path/to/participants.json \ --output-dir /path/to/output \ --create-dirs
The script will:
1. Load the EDF and apply channel mappings. 2. Load annotations and apply event mappings. 3. Extract participant metadata and group by category. 4. Validate checksums on all written data. 5. Output a ''.rkns'' file with 777 permissions.
Build the Docker image:
chmod +x build.sh ./build.sh
This creates an image with the same CLI interface. Run it with:
docker run --rm \ -v /path/to/data:/data \ -v /path/to/output:/output \ your-registry/your-image:tag \ python main.py \ --input-edf /data/sub-0001_task-sleep_eeg.edf \ --input-annotations /data/sub-0001_task-sleep_eeg_events.tsv \ --input-participants /data/participants.tsv \ --input-participants-json /data/participants.json \ --output-dir /output \ --create-dirs
Security Note: Build args are visible in intermediate layer history. Avoid storing secrets (API keys, credentials) as build args; use runtime environment variables or mount secrets instead.
Build with ./build.sh (credentials stay in builder stage) and try running it on the example data with the same arguments as run_example.sh.
Security: Build args are visible in intermediate layer history—avoid public logs.
The provided edf_migration_dag.py serves as a template for processing all EDF files in your dataset at scale using Apache Airflow.
The DAG:
1. Discovers all EDF files in a base directory. 2. For each EDF, constructs paths to its corresponding annotation, participants, and metadata files. 3. Creates a task that runs the Docker container with the appropriate arguments. 4. Collects results and validates completion.
Update base_path to point to your root BIDS-like directory:
base_path = "/your/data/root" # e.g., "/data/shhs-bids" edf_files = find_edf_files(base_path)
The DAG assumes each EDF file has a corresponding annotation file. The default logic replaces .edf with _events.tsv:
tsv_file = edf_file.replace('.edf', '_events.tsv')
If your annotation files:
# Example: annotations in a separate 'annotations/' subfolder tsv_file = edf_file.replace('/eeg/', '/annotations/').replace('.edf', '.tsv')
def find_annotation_file(edf_file: str, base_path: str) -> str: """Derive annotation file path from EDF file.""" # Implement your custom logic here pass
The DAG uses global participants.tsv and participants.json files. Ensure they exist at the repository root:
participants_tsv = f"{base_path}/participants.tsv" participants_json = f"{base_path}/participants.json"
If your dataset uses per-subject or per-session metadata:
Update the image parameter to match your built container:
image='your-registry/your-image:tag' # Must match: docker build -t your-registry/your-image:tag .
The output path specifies where .rkns files are written:
output_dir = '/storage/rekonas-dataset-output'
Ensure:
–create-dirs).The DAG binds host directories into the container. Update both the host path and container mount point:
mounts=[{ 'source': '/absolute/path/on/host/data', # Your actual data directory 'target': '/data', # Path inside container 'type': 'bind' }]
Important: All –input-* and –output-dir arguments in the task command must use container paths (the target), not host paths.
Example:
command=[ 'python', 'main.py', '--input-edf', '/data/sub-0001_eeg.edf', # Container path '--input-annotations', '/data/sub-0001_events.tsv', '--input-participants', '/data/participants.tsv', '--input-participants-json', '/data/participants.json', '--output-dir', '/data/output', '--create-dirs' ]
Here's a complete example for a BIDS dataset stored at /mnt/bids:
from airflow import DAG from airflow.providers.docker.operators.docker import DockerOperator from datetime import datetime import glob base_path = "/mnt/bids" output_dir = "/mnt/bids/derivatives/rkns" docker_image = "my-org/edf-to-rkns:v1.0" # Find all EDF files edf_files = sorted(glob.glob(f"{base_path}/**/*_eeg.edf", recursive=True)) with DAG('edf_to_rkns_pipeline', start_date=datetime(2024, 1, 1), schedule_interval=None) as dag: for edf_file in edf_files: tsv_file = edf_file.replace('_eeg.edf', '_eeg.tsv') task = DockerOperator( task_id=f"convert_{Path(edf_file).stem}", image=docker_image, command=[ 'python', 'main.py', '--input-edf', edf_file, '--input-annotations', tsv_file, '--input-participants', f'{base_path}/participants.tsv', '--input-participants-json', f'{base_path}/participants.json', '--output-dir', output_dir, '--create-dirs' ], mounts=[{ 'source': base_path, 'target': base_path, 'type': 'bind' }], dag=dag )
.
├── README.md # This file
├── main.py # Core conversion logic (CLI entry point)
├── build.sh # Docker build script
├── Dockerfile # Container specification
├── pyproject.toml # Python dependencies
├── edf_migration_dag.py # Airflow DAG template
├── .devcontainer/ # Dev container config
│ ├── devcontainer.json
│ └── postStartCommand.sh
├── assets/ # Mapping files and examples
│ ├── replace_channels.json # Channel name mappings
│ ├── event_description_mapping.json # Event label mappings
│ ├── extracted_channels.txt # (Generated) Raw channel names
│ └── extracted_annotation_events.txt # (Generated) Raw event labels
├── test_replace_channels.py # Validation script for channel mappings
├── test_replace_events.py # Validation script for event mappings
└── scripts/
└── convert_annotations_to_rkns_tsv.py # (To implement) Preprocessing script
1. **Prepare Annotations** - Convert your source annotations (XML, proprietary, etc.) to RKNS-compatible TSV format. - Run ''convert_annotations_to_rkns_tsv.py'' on your raw data. 2. **Create Mappings** - Extract unique channel names and event labels from your data. - Use ''test_replace_channels.py'' and ''test_replace_events.py'' to validate mappings. - Refine ''assets/replace_channels.json'' and ''assets/event_description_mapping.json'' iteratively. 3. **Prepare Metadata** - Ensure ''participants.tsv'' contains all subject-level metadata. - Create or obtain ''participants.json'' with folder paths for each column. - Verify that the ''category_mapping'' in ''main.py'' covers all your folders. 4. **Test Locally** - Run ''python main.py --input-edf ... --output-dir ... --create-dirs'' on a sample file. - Verify the output ''.rkns'' file is valid and contains expected data. 5. **Build Docker Image** - Run ''./build.sh'' to create a portable container. - Test the container on the same sample file. 6. **Configure Airflow** - Adapt ''edf_migration_dag.py'' for your data layout. - Deploy to your Airflow instance. - Trigger the DAG to process all files. 7. **Monitor and Validate** - Check Airflow logs for each task. - Verify output ''.rkns'' files are written with correct permissions. - Spot-check metadata categorization and event labels.
–input-edf is correct and exists.extract_sub_part() in main.py to match your naming convention:def extract_sub_part(filepath): stem = Path(filepath).stem match = re.search(r'sub-[a-zA-Z]{4}\d+', stem) # Adjust regex here if not match: raise ValueError(f"Could not extract subject id from {filepath}") return match.group(0)
participants.json contains a “folder” key for each column.folder values are in category_mapping.category_mapping as needed.test_replace_channels.py or test_replace_events.py.–create-dirs to auto-create the directory with correct permissions.pyproject.toml are compatible.Dockerfile references the correct base image and Python version.build.sh for any environment variable or credential issues.edf_to_rkns() – Main conversion function. Orchestrates all steps.validate_rkns_checksum() – Validates data integrity by reading all signal blocks.extract_sub_part() – Extracts participant ID from EDF filename. Customize for your naming convention.category_mapping – Dictionary mapping folder paths to standardized categories. Extend if your dataset uses different paths.parse_args() – CLI argument parser. Do not modify.[comment].Description and required folder keys.category_mapping in main.py.participants.json).participant_id (e.g., “sub-0001”).This template is intentionally configurable. The main customization points are:
extract_sub_part() in main.py – Update the regex if your participant ID format differs.category_mapping in main.py – Add or modify mappings if your dataset uses different folder paths.assets/replace_channels.json – Adapt channel mappings to your EDF sources.assets/event_description_mapping.json – Adapt event mappings to your annotation sources.edf_migration_dag.py – Adjust discovery logic, paths, and Docker config for your environment.
For questions or issues, refer to the troubleshooting section or the inline code comments in main.py.
Note: To start a new dataset conversion project, create a new repository using airflow-template-dataset-to-rkns as a template. This provides access to internal Rekonas packages and a foundational workflow.