====== Turning Datasets into RKNS Format ====== This guide describes how to convert a structured dataset (e.g., BIDS) into the **RKNS format** using the [[https://code.rekonas.com/rekonas/airflow-template-dataset-to-rkns|airflow-template-dataset-to-rkns]] template repository. The process is divided into three main phases: - **Transform Logic** – Implement the conversion from raw data to RKNS (via ''main.py''). - **Containerization** – Package the logic into a portable Docker image (''build.sh'' + ''Dockerfile''). - **Orchestration** – Run the transformation at scale using Apache Airflow. Below, we detail each phase with practical guidance. ---- ====== 1. Transform Logic (main.py) ====== The core conversion logic lives in ''main.py''. It reads: * An **EDF file** (physiological signals), * A **TSV annotation file** (onset, duration, event), * A **participants.tsv** metadata table (subject-level data), * A **participants.json** column dictionary (metadata codebook), and outputs a validated ''*.rkns*'' file (a Zarr zip store). ==== CLI Interface ==== It is recommended to keep the CLI interface unchanged if possible—it is designed for easier Airflow usage: python main.py \ --input-edf # EDF file with signal data --input-annotations # TSV with [onset, duration, event] --input-participants # participants.tsv with subject metadata --input-participants-json # participants.json codebook --output-dir # Directory for output .rkns file [--create-dirs] # Optionally create output dir if missing ===== Conversion Workflow ===== The ''edf_to_rkns()'' function executes the following steps: ==== 1.1 Load and Standardize Signals ==== rkns_obj = rkns.from_external_format( input_file_edf, channel_mapping=replacement_dict, # From assets/replace_channels.json exclude_channels=list(exclude_channels) ) Uses regex mappings in ''assets/replace_channels.json'' to rename EDF channels to standardized names (e.g., "EEG(sec)" → "EEG-C3-A2"). ==== 1.2 Add Annotations ==== Converts the TSV file to BIDS-compatible format and adds events: rkns.io.csv_onsetduration_to_bids(input_file_tsv, ...) rkns_obj.add_source_annotations( file_path=tmp_path, event_description_mapping=event_description_mapping # From assets/event_description_mapping.json ) ==== 1.3 Extract and Categorize Metadata ==== - Matches participant ID (e.g., "sub-0001") from the EDF filename. - Looks up the participant in ''participants.tsv''. - Uses ''participants.json'' to map columns to RKNS metadata categories (e.g., "demographics", "clinical"). - Groups metadata by category and adds each to the RKNS object. ==== 1.4 Finalize and Export ==== rkns_obj.populate() # Build internal structure rkns_obj.export_as_zip(output_file) # Write Zarr zip store ==== 1.5 Validate ==== validate_rkns_checksum(output_file) # Verify checksums on all data ---- ====== 2. Preprocessing: Generate RKNS-Compatible Event Annotations ====== RKNS requires events in a strict tab-separated (TSV) format with three columns: ''onset'', ''duration'', and ''event''. However, source annotations are often in XML, proprietary formats, or non-compliant TSVs (e.g., using absolute timestamps or missing durations). You must write a preprocessing script to convert these into the required format: onset duration event 0.0 30.0 stage_AASM_e1_W 30.0 30.0 stage_AASM_e1_N1 60.0 30.0 stage_AASM_e1_N2 ... ==== Key Requirements ==== - **onset**: Time in seconds relative to EDF start (not absolute wall-clock time). If your source uses absolute time, align it with the EDF's recording start time. - **duration**: Event length in seconds. If only onset and end are provided, compute ''duration = end - onset''. - **event**: Keep the original event label as-is—do not normalize it here. Normalization happens later via ''assets/event_description_mapping.json''. ==== Recommendation ==== Write this as a standalone, reusable Python script (e.g., ''convert_annotations_to_rkns_tsv.py''). Keep it in your derived repo for reproducibility. Avoid bash—use ''uv'' for inline dependencies if needed. ---- ====== 3. Standardizing Names via Regex Mappings ====== Once you have a compliant TSV, normalize channel and event names non-destructively using regex mappings defined in two JSON files. ===== Channel Names → assets/replace_channels.json ===== EDF channel labels vary wildly (e.g., "EEG C3-A2", "EEG(sec)"). Map them to standardized RKNS names: { "(?i)^EEG\\(sec\\)\\s*$": "EEG-C3-A2", "(?i)^SaO2\\s*$": "SPO2", "(?i)^ABDO\\sRES\\s*$": "RESP-ABD", "(?i)^THOR\\sRES\\s*$": "RESP-CHEST", "_": "-" } Keys are regex patterns (applied sequentially), and values are replacement strings. Groups in the pattern can be referenced in the replacement (e.g., ''\1''). ==== Validation ==== Use ''test_replace_channels.py'' to validate your mappings: 1. Extract unique channel names from your EDF files: find /data -name "*.edf" -exec edf-peek {} \; | grep "signal_labels" | sort | uniq > assets/extracted_channels.txt 2. Run the test: python test_replace_channels.py 3. Inspect the output: cat out/renamed_channels.csv ==== AI-Assisted Mapping Generation ==== If you have a list of unique channel names, use this prompt with your LLM to accelerate mapping creation: I will provide you a list of physiological signal channels (e.g., EEG, EOG, EMG, respiratory, cardiac) extracted from original EDF files. I require an output in JSON format that maps each raw channel name to a standardized name using **sequential, grep-style regex replacements**. Key requirements: 1. **Include early normalization rules** to handle common delimiters (e.g., `_`, spaces, `.`, `/`, parentheses) by converting them to hyphens (`-`), collapsing multiple hyphens, and trimming leading/trailing hyphens. 2. All patterns must be **case-insensitive** (use `(?i)`). 3. Use **physiologically meaningful, NSRR/AASM-aligned names**, such as: - `EEG-C3-M2` (not `EEG-C3_A2` or ambiguous forms) - `EMG-LLEG` / `EMG-RLEG` for leg EMG (not `LAT`/`RAT` as position) - `RESP-AIRFLOW-THERM` or `RESP-AIRFLOW-PRES` (not generic `RESP-NASAL`) - `EOG-LOC` / `EOG-ROC` for eye channels - `EMG-CHIN` for chin EMG - `PULSE` for heart rate or pulse signals (unless raw ECG → `ECG`) 4. **Do not include a final catch-all rule** (e.g., `^(.+)$ → MISC-\1`) unless explicitly requested—most channels in the input list should be known and mapped specifically. 5. Replacements are applied **in order**, with each rule operating on the result of the previous one. Example reference snippet: ```json { "(?i)[\\s_\\./\\(\\),]+": "-", "-+": "-", "^-|-$": "", "(?i)^abdomen$": "RESP-ABD", "(?i)^c3_m2$": "EEG-C3-M2", "(?i)^lat$": "EMG-LLEG" } ``` Now, generate a similar JSON mapping for the following list of channel names: ``` [INSERT YOUR UNIQUE CHANNEL LIST HERE] ``` Provide the output **within a JSON code block only**—no explanations. ===== Event Descriptions → assets/event_description_mapping.json ===== Normalize inconsistent event labels (e.g., "stage_AASM_e1_W" → "sleep_stage_wake"): { "(?i)^stage_AASM_e1_W$": "sleep_stage_wake", "(?i)^stage_AASM_e1_N1$": "sleep_stage_n1", "(?i)^stage_AASM_e1_N2$": "sleep_stage_n2", "(?i)^stage_AASM_e1_N3$": "sleep_stage_n3", "(?i)^stage_AASM_e1_R$": "sleep_stage_rem", "^\\s*(?!sleep_stage_|arousal)\"?([^\"]+)\"?\\s*$": "[comment] \\1" } The last pattern acts as a catch-all: unknown events are prefixed with ''[comment]'' to prevent validation errors. ==== Validation ==== Use ''test_replace_events.py'' to validate your mappings: 1. Extract unique event names from your annotations: tail -n +2 /data/**/*_events.tsv | cut -f3 | sort | uniq > assets/extracted_annotation_events.txt 2. Run the test: python test_replace_events.py 3. Inspect the output: cat out/renamed_events.csv ==== AI-Assisted Mapping Generation ==== Use this prompt with your LLM to accelerate mapping creation: I will provide you a list of sleep events extracted from original annotation files. I require an output in JSON format that maps event names to standardized names using grep-style regex replacements. For example, this is a reference mapping. Note that replacements are applied in sequential order: ```json { "(?i)^stage_AASM_e1_W$": "sleep_stage_wake", "(?i)^stage_AASM_e1_R$": "sleep_stage_rem", "(?i)^arousals_e1_\\('arousal_standard',\\s*'EEG_C3'\\)$": "arousal_eeg_c3", "^\\s*(?!sleep_stage_|arousal|apnea|hypopnea|desaturation|artifact|body_position)\"?([^\"]+)\"?\\s*$": "[comment] \\1" } ``` The keys are regex patterns (case-insensitive), and the values are replacement strings. Groups in the pattern can be referenced in the replacement (e.g., ''\1''). Based on the above example, generate a similar JSON mapping for the following list of event names: ``` [INSERT YOUR UNIQUE EVENT LIST HERE] ``` Provide the output within a JSON code block. ---- ====== 4. Metadata Handling ====== RKNS groups metadata by high-level categories (e.g., ''demographics'', ''clinical'', ''questionnaires'') to organize heterogeneous data sources into a consistent internal structure. To enable this, your ''participants.json'' **must include a** ''"folder"'' **key** for each column, using the **original NSRR folder path** (or your dataset's metadata taxonomy) as the value: { "age": { "Description": "Age of participant at baseline", "folder": "Harmonized/Demographics" }, "BMI": { "Description": "Body mass index", "folder": "Anthropometry" }, "PHQ_9_total": { "Description": "Patient Health Questionnaire total score", "folder": "Sleep Questionnaires/Sleep Disturbance" } } ==== Category Mapping ==== The script uses the ''category_mapping'' dictionary (defined at the top of ''main.py'') to automatically map folder paths to one of these standardized categories: * ''administrative'' – Consent, study center, visit info * ''anthropometry'' – Height, weight, BMI * ''clinical'' – Physical exam, vital signs * ''demographics'' – Age, sex, race, ethnicity * ''general_health'' – Quality of life, self-reported health * ''lifestyle_and_behavioral_health'' – Diet, substance use, physical activity, mental health * ''medical_history'' – Medications, comorbidities, surgical history * ''questionnaires'' – Survey-based instruments (PHQ-9, GAD-7, ISI, etc.) * ''sleep_monitoring'' – Polysomnography, actigraphy, oximetry, respiratory events * ''treatment'' – CPAP, therapy, adherence ==== How It Works ==== 1. The script extracts the participant ID from the EDF filename (e.g., "sub-0001" from "sub-0001_task-sleep_eeg.edf"). 2. It finds the matching row in ''participants.tsv''. 3. For each column in that row, it looks up the column name in ''participants.json'' to find its ''folder''. 4. It maps the ''folder'' to a standardized category using ''category_mapping''. 5. Metadata is grouped by category and added to the RKNS object. This abstraction allows RKNS to handle diverse datasets (NSRR, custom studies, etc.) while maintaining a uniform schema for downstream analysis. ---- ====== 5. CLI: Python & Docker ====== ==== Testing the Development CLI ==== Once you've implemented your conversion logic in ''main.py'', test it end-to-end with your actual data: python main.py \ --input-edf /path/to/sub-0001_task-sleep_eeg.edf \ --input-annotations /path/to/sub-0001_task-sleep_eeg_events.tsv \ --input-participants /path/to/participants.tsv \ --input-participants-json /path/to/participants.json \ --output-dir /path/to/output \ --create-dirs The script will: 1. Load the EDF and apply channel mappings. 2. Load annotations and apply event mappings. 3. Extract participant metadata and group by category. 4. Validate checksums on all written data. 5. Output a ''.rkns'' file with 777 permissions. ==== Building and Testing the Docker Image ==== Build the Docker image: chmod +x build.sh ./build.sh This creates an image with the same CLI interface. Run it with: docker run --rm \ -v /path/to/data:/data \ -v /path/to/output:/output \ your-registry/your-image:tag \ python main.py \ --input-edf /data/sub-0001_task-sleep_eeg.edf \ --input-annotations /data/sub-0001_task-sleep_eeg_events.tsv \ --input-participants /data/participants.tsv \ --input-participants-json /data/participants.json \ --output-dir /output \ --create-dirs **Security Note:** Build args are visible in intermediate layer history. Avoid storing secrets (API keys, credentials) as build args; use runtime environment variables or mount secrets instead. ==== Testing the Docker-CLI ==== Build with ''./build.sh'' (credentials stay in builder stage) and try running it on the example data with the same arguments as ''run_example.sh''. **Security:** Build args are visible in intermediate layer history—avoid public logs. ---- ====== 6. Orchestration: Airflow DAG ====== The provided ''edf_migration_dag.py'' serves as a template for processing all EDF files in your dataset at scale using Apache Airflow. ==== DAG Overview ==== The DAG: 1. Discovers all EDF files in a base directory. 2. For each EDF, constructs paths to its corresponding annotation, participants, and metadata files. 3. Creates a task that runs the Docker container with the appropriate arguments. 4. Collects results and validates completion. ==== Required Adaptations ==== === a) Input Dataset Path === Update ''base_path'' to point to your root BIDS-like directory: base_path = "/your/data/root" # e.g., "/data/shhs-bids" edf_files = find_edf_files(base_path) === b) Annotation File Naming Logic === The DAG assumes each EDF file has a corresponding annotation file. The default logic replaces ''.edf'' with ''_events.tsv'': tsv_file = edf_file.replace('.edf', '_events.tsv') If your annotation files: * **Use a different naming pattern**, update the derivation: # Example: annotations in a separate 'annotations/' subfolder tsv_file = edf_file.replace('/eeg/', '/annotations/').replace('.edf', '.tsv') * **Have inconsistent naming**, implement a lookup function: def find_annotation_file(edf_file: str, base_path: str) -> str: """Derive annotation file path from EDF file.""" # Implement your custom logic here pass === c) Metadata File Paths === The DAG uses global ''participants.tsv'' and ''participants.json'' files. Ensure they exist at the repository root: participants_tsv = f"{base_path}/participants.tsv" participants_json = f"{base_path}/participants.json" If your dataset uses per-subject or per-session metadata: * Derive paths dynamically from the EDF filename. * Validate that files exist before passing to the Docker task. === d) Docker Image Name === Update the image parameter to match your built container: image='your-registry/your-image:tag' # Must match: docker build -t your-registry/your-image:tag . === e) Output Directory === The output path specifies where ''.rkns'' files are written: output_dir = '/storage/rekonas-dataset-output' Ensure: * The path exists on the Airflow worker node (or use ''--create-dirs''). * The Airflow worker has write permissions. * Sufficient disk space is available. === f) Volume Mounts === The DAG binds host directories into the container. Update both the host path and container mount point: mounts=[{ 'source': '/absolute/path/on/host/data', # Your actual data directory 'target': '/data', # Path inside container 'type': 'bind' }] **Important:** All ''--input-*'' and ''--output-dir'' arguments in the task command must use container paths (the ''target''), not host paths. Example: command=[ 'python', 'main.py', '--input-edf', '/data/sub-0001_eeg.edf', # Container path '--input-annotations', '/data/sub-0001_events.tsv', '--input-participants', '/data/participants.tsv', '--input-participants-json', '/data/participants.json', '--output-dir', '/data/output', '--create-dirs' ] === g) Example: Full Adaptation === Here's a complete example for a BIDS dataset stored at ''/mnt/bids'': from airflow import DAG from airflow.providers.docker.operators.docker import DockerOperator from datetime import datetime import glob base_path = "/mnt/bids" output_dir = "/mnt/bids/derivatives/rkns" docker_image = "my-org/edf-to-rkns:v1.0" # Find all EDF files edf_files = sorted(glob.glob(f"{base_path}/**/*_eeg.edf", recursive=True)) with DAG('edf_to_rkns_pipeline', start_date=datetime(2024, 1, 1), schedule_interval=None) as dag: for edf_file in edf_files: tsv_file = edf_file.replace('_eeg.edf', '_eeg.tsv') task = DockerOperator( task_id=f"convert_{Path(edf_file).stem}", image=docker_image, command=[ 'python', 'main.py', '--input-edf', edf_file, '--input-annotations', tsv_file, '--input-participants', f'{base_path}/participants.tsv', '--input-participants-json', f'{base_path}/participants.json', '--output-dir', output_dir, '--create-dirs' ], mounts=[{ 'source': base_path, 'target': base_path, 'type': 'bind' }], dag=dag ) ---- ====== 7. Project Structure ====== . ├── README.md # This file ├── main.py # Core conversion logic (CLI entry point) ├── build.sh # Docker build script ├── Dockerfile # Container specification ├── pyproject.toml # Python dependencies ├── edf_migration_dag.py # Airflow DAG template ├── .devcontainer/ # Dev container config │ ├── devcontainer.json │ └── postStartCommand.sh ├── assets/ # Mapping files and examples │ ├── replace_channels.json # Channel name mappings │ ├── event_description_mapping.json # Event label mappings │ ├── extracted_channels.txt # (Generated) Raw channel names │ └── extracted_annotation_events.txt # (Generated) Raw event labels ├── test_replace_channels.py # Validation script for channel mappings ├── test_replace_events.py # Validation script for event mappings └── scripts/ └── convert_annotations_to_rkns_tsv.py # (To implement) Preprocessing script ---- ====== 8. Workflow Summary ====== ==== End-to-End Process ==== 1. **Prepare Annotations** - Convert your source annotations (XML, proprietary, etc.) to RKNS-compatible TSV format. - Run ''convert_annotations_to_rkns_tsv.py'' on your raw data. 2. **Create Mappings** - Extract unique channel names and event labels from your data. - Use ''test_replace_channels.py'' and ''test_replace_events.py'' to validate mappings. - Refine ''assets/replace_channels.json'' and ''assets/event_description_mapping.json'' iteratively. 3. **Prepare Metadata** - Ensure ''participants.tsv'' contains all subject-level metadata. - Create or obtain ''participants.json'' with folder paths for each column. - Verify that the ''category_mapping'' in ''main.py'' covers all your folders. 4. **Test Locally** - Run ''python main.py --input-edf ... --output-dir ... --create-dirs'' on a sample file. - Verify the output ''.rkns'' file is valid and contains expected data. 5. **Build Docker Image** - Run ''./build.sh'' to create a portable container. - Test the container on the same sample file. 6. **Configure Airflow** - Adapt ''edf_migration_dag.py'' for your data layout. - Deploy to your Airflow instance. - Trigger the DAG to process all files. 7. **Monitor and Validate** - Check Airflow logs for each task. - Verify output ''.rkns'' files are written with correct permissions. - Spot-check metadata categorization and event labels. ---- ====== 9. Troubleshooting ====== ==== EDF File Not Found ==== - Verify the path in ''--input-edf'' is correct and exists. - If using Docker, ensure the path is relative to the container mount point (not the host). ==== Annotations File Not Found ==== - Check that the annotation file naming logic matches your dataset. - Ensure the TSV is in RKNS-compatible format (three columns: onset, duration, event). ==== Participant Not Found in participants.tsv ==== - The participant ID extraction regex may not match your filename pattern. - Update ''extract_sub_part()'' in ''main.py'' to match your naming convention: def extract_sub_part(filepath): stem = Path(filepath).stem match = re.search(r'sub-[a-zA-Z]{4}\d+', stem) # Adjust regex here if not match: raise ValueError(f"Could not extract subject id from {filepath}") return match.group(0) ==== Metadata Columns Not Recognized ==== - Verify that ''participants.json'' contains a ''"folder"'' key for each column. - Check that the ''folder'' values are in ''category_mapping''. - Add missing mappings to ''category_mapping'' as needed. ==== Channel or Event Names Not Mapped ==== - Extract raw names and add them to the regex mapping JSON. - Test with ''test_replace_channels.py'' or ''test_replace_events.py''. - Use LLM prompts to generate regex patterns if needed. ==== Permission Denied on Output File ==== - Ensure the output directory is writable by the process (or Airflow worker). - Use ''--create-dirs'' to auto-create the directory with correct permissions. ==== Docker Build Fails ==== - Check that all dependencies in ''pyproject.toml'' are compatible. - Verify the ''Dockerfile'' references the correct base image and Python version. - Check ''build.sh'' for any environment variable or credential issues. ---- ====== 10. Key Files Reference ====== ==== main.py ==== - **''edf_to_rkns()''** – Main conversion function. Orchestrates all steps. - **''validate_rkns_checksum()''** – Validates data integrity by reading all signal blocks. - **''extract_sub_part()''** – Extracts participant ID from EDF filename. **Customize for your naming convention.** - **''category_mapping''** – Dictionary mapping folder paths to standardized categories. **Extend if your dataset uses different paths.** - **''parse_args()''** – CLI argument parser. Do not modify. ==== assets/replace_channels.json ==== - Regex patterns (keys) → standardized channel names (values). - Applied sequentially in order. - Examples: "EEG(sec)" → "EEG-C3-A2", "SaO2" → "SPO2". ==== assets/event_description_mapping.json ==== - Regex patterns (keys) → standardized event labels (values). - Applied sequentially in order. - Catch-all pattern: unknown events prefixed with ''[comment]''. ==== participants.json ==== - Column codebook with ''Description'' and **required** ''folder'' keys. - Folder values are mapped to categories by ''category_mapping'' in ''main.py''. ==== participants.tsv ==== - Subject-level metadata table (BIDS-compatible). - Rows = subjects, columns = variables (must match ''participants.json''). - First column should be ''participant_id'' (e.g., "sub-0001"). ---- ====== 11. Contributing & Customization ====== This template is intentionally configurable. The main customization points are: - **''extract_sub_part()'' in main.py** – Update the regex if your participant ID format differs. - **''category_mapping'' in main.py** – Add or modify mappings if your dataset uses different folder paths. - **''assets/replace_channels.json''** – Adapt channel mappings to your EDF sources. - **''assets/event_description_mapping.json''** – Adapt event mappings to your annotation sources. - **''edf_migration_dag.py''** – Adjust discovery logic, paths, and Docker config for your environment. For questions or issues, refer to the troubleshooting section or the inline code comments in ''main.py''. ---- **Note:** To start a new dataset conversion project, create a new repository using [[https://code.rekonas.com/rekonas/airflow-template-dataset-to-rkns|airflow-template-dataset-to-rkns]] as a template. This provides access to internal Rekonas packages and a foundational workflow.