workflow_dataset_to_rkns

This is an old revision of the document!


Turning Datasets into RKNS Format

This guide describes how to convert a structured dataset (e.g., BIDS) into the RKNS format using the airflow-template-dataset-to-rkns template repository. The process is divided into three main phases:

  1. Transform Logic – Implement the conversion from raw data to RKNS (via main.py).
  2. Containerization – Package the logic into a portable Docker image (build.sh + Dockerfile).
  3. Orchestration – Run the transformation at scale using Apache Airflow.

Below, we detail each phase with practical guidance.


1. Transform Logic (main.py)

The core conversion logic lives in main.py. It reads:

  • An EDF file (physiological signals),
  • A TSV annotation file (onset, duration, event),
  • A participants.tsv metadata table (subject-level data),
  • A participants.json column dictionary (metadata codebook),

and outputs a validated *.rkns* file (a Zarr zip store).

It is recommended to keep the CLI interface unchanged if possible—it is designed for easier Airflow usage:

python main.py \
  --input-edf <path>                    # EDF file with signal data
  --input-annotations <path>            # TSV with [onset, duration, event]
  --input-participants <path>           # participants.tsv with subject metadata
  --input-participants-json <path>      # participants.json codebook
  --output-dir <path>                   # Directory for output .rkns file
  [--create-dirs]                       # Optionally create output dir if missing

The edf_to_rkns() function executes the following steps:

1.1 Load and Standardize Signals

rkns_obj = rkns.from_external_format(
    input_file_edf, 
    channel_mapping=replacement_dict,  # From assets/replace_channels.json
    exclude_channels=list(exclude_channels)
)

Uses regex mappings in assets/replace_channels.json to rename EDF channels to standardized names (e.g., “EEG(sec)” → “EEG-C3-A2”).

Converts the TSV file to BIDS-compatible format and adds events:

rkns.io.csv_onsetduration_to_bids(input_file_tsv, ...)
rkns_obj.add_source_annotations(
    file_path=tmp_path, 
    event_description_mapping=event_description_mapping  # From assets/event_description_mapping.json
)
  1. Matches participant ID (e.g., “sub-0001”) from the EDF filename.
  2. Looks up the participant in participants.tsv.
  3. Uses participants.json to map columns to RKNS metadata categories (e.g., “demographics”, “clinical”).
  4. Groups metadata by category and adds each to the RKNS object.
rkns_obj.populate()                     # Build internal structure
rkns_obj.export_as_zip(output_file)     # Write Zarr zip store
validate_rkns_checksum(output_file)     # Verify checksums on all data

2. Preprocessing: Generate RKNS-Compatible Event Annotations

RKNS requires events in a strict tab-separated (TSV) format with three columns: onset, duration, and event. However, source annotations are often in XML, proprietary formats, or non-compliant TSVs (e.g., using absolute timestamps or missing durations).

You must write a preprocessing script to convert these into the required format:

onset	duration	event
0.0	30.0	stage_AASM_e1_W
30.0	30.0	stage_AASM_e1_N1
60.0	30.0	stage_AASM_e1_N2
...
  1. onset: Time in seconds relative to EDF start (not absolute wall-clock time).

If your source uses absolute time, align it with the EDF's recording start time.

  1. duration: Event length in seconds.

If only onset and end are provided, compute duration = end - onset.

  1. event: Keep the original event label as-is—do not normalize it here.

Normalization happens later via assets/event_description_mapping.json.

Write this as a standalone, reusable Python script (e.g., convert_annotations_to_rkns_tsv.py). Keep it in your derived repo for reproducibility. Avoid bash—use uv for inline dependencies if needed.


Once you have a compliant TSV, normalize channel and event names non-destructively using regex mappings defined in two JSON files.

Channel Names → assets/replace_channels.json

EDF channel labels vary wildly (e.g., “EEG C3-A2”, “EEG(sec)”). Map them to standardized RKNS names:

{
  "(?i)^EEG\\(sec\\)\\s*$": "EEG-C3-A2",
  "(?i)^SaO2\\s*$": "SPO2",
  "(?i)^ABDO\\sRES\\s*$": "RESP-ABD",
  "(?i)^THOR\\sRES\\s*$": "RESP-CHEST",
  "_": "-"
}

Keys are regex patterns (applied sequentially), and values are replacement strings. Groups in the pattern can be referenced in the replacement (e.g., \1).

Use test_replace_channels.py to validate your mappings:

1. Extract unique channel names from your EDF files:

find /data -name "*.edf" -exec edf-peek {} \; | grep "signal_labels" | sort | uniq > assets/extracted_channels.txt

2. Run the test:

python test_replace_channels.py

3. Inspect the output:

cat out/renamed_channels.csv

If you have a list of unique channel names, use this prompt with your LLM to accelerate mapping creation:

I will provide you a list of EEG channels extracted from the original EDFs.  
I require an output in JSON format that maps channel names to new standardized names using grep-style regex replacements.  

For example, this is a reference mapping. Note that replacements are applied in sequential order:
```
{
  "(?i)^ABDO\\sRES\\s*$": "RESP-ABD",
  "(?i)^THOR\\sRES\\s*$": "RESP-CHEST",
  "(?i)^EEG\\(sec\\)\\s*$": "EEG-C3-A2",
  "_": "-"
}
```

The keys are regex patterns (case-insensitive), and the values are replacement strings. Groups in the pattern can be referenced in the replacement (e.g., ''\1'').

Based on the above example, generate a similar JSON mapping for the following list of channel names:
```
[INSERT YOUR UNIQUE CHANNEL LIST HERE]
```

Provide the output within a JSON code block.

Normalize inconsistent event labels (e.g., “stage_AASM_e1_W” → “sleep_stage_wake”):

{
  "(?i)^stage_AASM_e1_W$": "sleep_stage_wake",
  "(?i)^stage_AASM_e1_N1$": "sleep_stage_n1",
  "(?i)^stage_AASM_e1_N2$": "sleep_stage_n2",
  "(?i)^stage_AASM_e1_N3$": "sleep_stage_n3",
  "(?i)^stage_AASM_e1_R$": "sleep_stage_rem",
  "^\\s*(?!sleep_stage_|arousal)\"?([^\"]+)\"?\\s*$": "[comment] \\1"
}

The last pattern acts as a catch-all: unknown events are prefixed with [comment] to prevent validation errors.

Use test_replace_events.py to validate your mappings:

1. Extract unique event names from your annotations:

tail -n +2 /data/**/*_events.tsv | cut -f3 | sort | uniq > assets/extracted_annotation_events.txt

2. Run the test:

python test_replace_events.py

3. Inspect the output:

cat out/renamed_events.csv

Use this prompt with your LLM to accelerate mapping creation:

I will provide you a list of sleep events extracted from original annotation files.  
I require an output in JSON format that maps event names to standardized names using grep-style regex replacements.  

For example, this is a reference mapping. Note that replacements are applied in sequential order:
```json
{
  "(?i)^stage_AASM_e1_W$": "sleep_stage_wake",
  "(?i)^stage_AASM_e1_R$": "sleep_stage_rem",
  "(?i)^arousals_e1_\\('arousal_standard',\\s*'EEG_C3'\\)$": "arousal_eeg_c3",
  "^\\s*(?!sleep_stage_|arousal|apnea|hypopnea|desaturation|artifact|body_position)\"?([^\"]+)\"?\\s*$": "[comment] \\1"
}
```

The keys are regex patterns (case-insensitive), and the values are replacement strings. Groups in the pattern can be referenced in the replacement (e.g., ''\1'').

Based on the above example, generate a similar JSON mapping for the following list of event names:
```
[INSERT YOUR UNIQUE EVENT LIST HERE]
```
 
Provide the output within a JSON code block.

4. Metadata Handling

RKNS groups metadata by high-level categories (e.g., demographics, clinical, questionnaires) to organize heterogeneous data sources into a consistent internal structure.

To enable this, your participants.json must include a “folder” key for each column, using the original NSRR folder path (or your dataset's metadata taxonomy) as the value:

{
  "age": {
    "Description": "Age of participant at baseline",
    "folder": "Harmonized/Demographics"
  },
  "BMI": {
    "Description": "Body mass index",
    "folder": "Anthropometry"
  },
  "PHQ_9_total": {
    "Description": "Patient Health Questionnaire total score",
    "folder": "Sleep Questionnaires/Sleep Disturbance"
  }
}

The script uses the category_mapping dictionary (defined at the top of main.py) to automatically map folder paths to one of these standardized categories:

  • administrative – Consent, study center, visit info
  • anthropometry – Height, weight, BMI
  • clinical – Physical exam, vital signs
  • demographics – Age, sex, race, ethnicity
  • general_health – Quality of life, self-reported health
  • lifestyle_and_behavioral_health – Diet, substance use, physical activity, mental health
  • medical_history – Medications, comorbidities, surgical history
  • questionnaires – Survey-based instruments (PHQ-9, GAD-7, ISI, etc.)
  • sleep_monitoring – Polysomnography, actigraphy, oximetry, respiratory events
  • treatment – CPAP, therapy, adherence
1. The script extracts the participant ID from the EDF filename (e.g., "sub-0001" from "sub-0001_task-sleep_eeg.edf").
2. It finds the matching row in ''participants.tsv''.
3. For each column in that row, it looks up the column name in ''participants.json'' to find its ''folder''.
4. It maps the ''folder'' to a standardized category using ''category_mapping''.
5. Metadata is grouped by category and added to the RKNS object.

This abstraction allows RKNS to handle diverse datasets (NSRR, custom studies, etc.) while maintaining a uniform schema for downstream analysis.


5. CLI: Python & Docker

Once you've implemented your conversion logic in main.py, test it end-to-end with your actual data:

python main.py \
  --input-edf /path/to/sub-0001_task-sleep_eeg.edf \
  --input-annotations /path/to/sub-0001_task-sleep_eeg_events.tsv \
  --input-participants /path/to/participants.tsv \
  --input-participants-json /path/to/participants.json \
  --output-dir /path/to/output \
  --create-dirs

The script will:

1. Load the EDF and apply channel mappings.
2. Load annotations and apply event mappings.
3. Extract participant metadata and group by category.
4. Validate checksums on all written data.
5. Output a ''.rkns'' file with 777 permissions.

Build the Docker image:

chmod +x build.sh
./build.sh

This creates an image with the same CLI interface. Run it with:

docker run --rm \
  -v /path/to/data:/data \
  -v /path/to/output:/output \
  your-registry/your-image:tag \
  python main.py \
    --input-edf /data/sub-0001_task-sleep_eeg.edf \
    --input-annotations /data/sub-0001_task-sleep_eeg_events.tsv \
    --input-participants /data/participants.tsv \
    --input-participants-json /data/participants.json \
    --output-dir /output \
    --create-dirs

Security Note: Build args are visible in intermediate layer history. Avoid storing secrets (API keys, credentials) as build args; use runtime environment variables or mount secrets instead.

Build with ./build.sh (credentials stay in builder stage) and try running it on the example data with the same arguments as run_example.sh.

Security: Build args are visible in intermediate layer history—avoid public logs.


6. Orchestration: Airflow DAG

The provided edf_migration_dag.py serves as a template for processing all EDF files in your dataset at scale using Apache Airflow.

The DAG:

1. Discovers all EDF files in a base directory.
2. For each EDF, constructs paths to its corresponding annotation, participants, and metadata files.
3. Creates a task that runs the Docker container with the appropriate arguments.
4. Collects results and validates completion.

a) Input Dataset Path

Update base_path to point to your root BIDS-like directory:

base_path = "/your/data/root"  # e.g., "/data/shhs-bids"
edf_files = find_edf_files(base_path)

b) Annotation File Naming Logic

The DAG assumes each EDF file has a corresponding annotation file. The default logic replaces .edf with _events.tsv:

tsv_file = edf_file.replace('.edf', '_events.tsv')

If your annotation files:

  • Use a different naming pattern, update the derivation:
 # Example: annotations in a separate 'annotations/' subfolder
tsv_file = edf_file.replace('/eeg/', '/annotations/').replace('.edf', '.tsv')
  • Have inconsistent naming, implement a lookup function:
def find_annotation_file(edf_file: str, base_path: str) -> str:
    """Derive annotation file path from EDF file."""
    # Implement your custom logic here
    pass

c) Metadata File Paths

The DAG uses global participants.tsv and participants.json files. Ensure they exist at the repository root:

participants_tsv = f"{base_path}/participants.tsv"
participants_json = f"{base_path}/participants.json"

If your dataset uses per-subject or per-session metadata:

  • Derive paths dynamically from the EDF filename.
  • Validate that files exist before passing to the Docker task.

d) Docker Image Name

Update the image parameter to match your built container:

image='your-registry/your-image:tag'  # Must match: docker build -t your-registry/your-image:tag .

e) Output Directory

The output path specifies where .rkns files are written:

output_dir = '/storage/rekonas-dataset-output'

Ensure:

  • The path exists on the Airflow worker node (or use –create-dirs).
  • The Airflow worker has write permissions.
  • Sufficient disk space is available.

f) Volume Mounts

The DAG binds host directories into the container. Update both the host path and container mount point:

mounts=[{
    'source': '/absolute/path/on/host/data',  # Your actual data directory
    'target': '/data',                         # Path inside container
    'type': 'bind'
}]

Important: All –input-* and –output-dir arguments in the task command must use container paths (the target), not host paths.

Example:

command=[
    'python', 'main.py',
    '--input-edf', '/data/sub-0001_eeg.edf',          # Container path
    '--input-annotations', '/data/sub-0001_events.tsv',
    '--input-participants', '/data/participants.tsv',
    '--input-participants-json', '/data/participants.json',
    '--output-dir', '/data/output',
    '--create-dirs'
]

g) Example: Full Adaptation

Here's a complete example for a BIDS dataset stored at /mnt/bids:

from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
from datetime import datetime
import glob
 
base_path = "/mnt/bids"
output_dir = "/mnt/bids/derivatives/rkns"
docker_image = "my-org/edf-to-rkns:v1.0"
 
# Find all EDF files
edf_files = sorted(glob.glob(f"{base_path}/**/*_eeg.edf", recursive=True))
 
with DAG('edf_to_rkns_pipeline', start_date=datetime(2024, 1, 1), schedule_interval=None) as dag:
    for edf_file in edf_files:
        tsv_file = edf_file.replace('_eeg.edf', '_eeg.tsv')
 
        task = DockerOperator(
            task_id=f"convert_{Path(edf_file).stem}",
            image=docker_image,
            command=[
                'python', 'main.py',
                '--input-edf', edf_file,
                '--input-annotations', tsv_file,
                '--input-participants', f'{base_path}/participants.tsv',
                '--input-participants-json', f'{base_path}/participants.json',
                '--output-dir', output_dir,
                '--create-dirs'
            ],
            mounts=[{
                'source': base_path,
                'target': base_path,
                'type': 'bind'
            }],
            dag=dag
        )

7. Project Structure

.
├── README.md                                    # This file
├── main.py                                      # Core conversion logic (CLI entry point)
├── build.sh                                     # Docker build script
├── Dockerfile                                   # Container specification
├── pyproject.toml                               # Python dependencies
├── edf_migration_dag.py                         # Airflow DAG template
├── .devcontainer/                               # Dev container config
│   ├── devcontainer.json
│   └── postStartCommand.sh
├── assets/                                      # Mapping files and examples
│   ├── replace_channels.json                    # Channel name mappings
│   ├── event_description_mapping.json           # Event label mappings
│   ├── extracted_channels.txt                   # (Generated) Raw channel names
│   └── extracted_annotation_events.txt          # (Generated) Raw event labels
├── test_replace_channels.py                     # Validation script for channel mappings
├── test_replace_events.py                       # Validation script for event mappings
└── scripts/
    └── convert_annotations_to_rkns_tsv.py       # (To implement) Preprocessing script

8. Workflow Summary

1. **Prepare Annotations**
   - Convert your source annotations (XML, proprietary, etc.) to RKNS-compatible TSV format.
   - Run ''convert_annotations_to_rkns_tsv.py'' on your raw data.
2. **Create Mappings**
   - Extract unique channel names and event labels from your data.
   - Use ''test_replace_channels.py'' and ''test_replace_events.py'' to validate mappings.
   - Refine ''assets/replace_channels.json'' and ''assets/event_description_mapping.json'' iteratively.
3. **Prepare Metadata**
   - Ensure ''participants.tsv'' contains all subject-level metadata.
   - Create or obtain ''participants.json'' with folder paths for each column.
   - Verify that the ''category_mapping'' in ''main.py'' covers all your folders.
4. **Test Locally**
   - Run ''python main.py --input-edf ... --output-dir ... --create-dirs'' on a sample file.
   - Verify the output ''.rkns'' file is valid and contains expected data.
5. **Build Docker Image**
   - Run ''./build.sh'' to create a portable container.
   - Test the container on the same sample file.
6. **Configure Airflow**
   - Adapt ''edf_migration_dag.py'' for your data layout.
   - Deploy to your Airflow instance.
   - Trigger the DAG to process all files.
7. **Monitor and Validate**
   - Check Airflow logs for each task.
   - Verify output ''.rkns'' files are written with correct permissions.
   - Spot-check metadata categorization and event labels.

9. Troubleshooting

  1. Verify the path in –input-edf is correct and exists.
  2. If using Docker, ensure the path is relative to the container mount point (not the host).
  1. Check that the annotation file naming logic matches your dataset.
  2. Ensure the TSV is in RKNS-compatible format (three columns: onset, duration, event).
  1. The participant ID extraction regex may not match your filename pattern.
  2. Update extract_sub_part() in main.py to match your naming convention:
def extract_sub_part(filepath):
    stem = Path(filepath).stem
    match = re.search(r'sub-[a-zA-Z]{4}\d+', stem)  # Adjust regex here
    if not match:
        raise ValueError(f"Could not extract subject id from {filepath}")
    return match.group(0)
  1. Verify that participants.json contains a “folder” key for each column.
  2. Check that the folder values are in category_mapping.
  3. Add missing mappings to category_mapping as needed.
  1. Extract raw names and add them to the regex mapping JSON.
  2. Test with test_replace_channels.py or test_replace_events.py.
  3. Use LLM prompts to generate regex patterns if needed.
  1. Ensure the output directory is writable by the process (or Airflow worker).
  2. Use –create-dirs to auto-create the directory with correct permissions.
  1. Check that all dependencies in pyproject.toml are compatible.
  2. Verify the Dockerfile references the correct base image and Python version.
  3. Check build.sh for any environment variable or credential issues.

10. Key Files Reference

  1. edf_to_rkns() – Main conversion function. Orchestrates all steps.
  2. validate_rkns_checksum() – Validates data integrity by reading all signal blocks.
  3. extract_sub_part() – Extracts participant ID from EDF filename. Customize for your naming convention.
  4. category_mapping – Dictionary mapping folder paths to standardized categories. Extend if your dataset uses different paths.
  5. parse_args() – CLI argument parser. Do not modify.
  1. Regex patterns (keys) → standardized channel names (values).
  2. Applied sequentially in order.
  3. Examples: “EEG(sec)” → “EEG-C3-A2”, “SaO2” → “SPO2”.
  1. Regex patterns (keys) → standardized event labels (values).
  2. Applied sequentially in order.
  3. Catch-all pattern: unknown events prefixed with [comment].
  1. Column codebook with Description and required folder keys.
  2. Folder values are mapped to categories by category_mapping in main.py.
  1. Subject-level metadata table (BIDS-compatible).
  2. Rows = subjects, columns = variables (must match participants.json).
  3. First column should be participant_id (e.g., “sub-0001”).

11. Contributing & Customization

This template is intentionally configurable. The main customization points are:

  1. extract_sub_part() in main.py – Update the regex if your participant ID format differs.
  2. category_mapping in main.py – Add or modify mappings if your dataset uses different folder paths.
  3. assets/replace_channels.json – Adapt channel mappings to your EDF sources.
  4. assets/event_description_mapping.json – Adapt event mappings to your annotation sources.
  5. edf_migration_dag.py – Adjust discovery logic, paths, and Docker config for your environment.

For questions or issues, refer to the troubleshooting section or the inline code comments in main.py.


Note: To start a new dataset conversion project, create a new repository using airflow-template-dataset-to-rkns as a template. This provides access to internal Rekonas packages and a foundational workflow.

  • workflow_dataset_to_rkns.1760964453.txt.gz
  • Last modified: 2025/10/20 12:47
  • by fabricio