Skip to content

EDF/BDF Exporter

The EDF/BDF exporter module in biosigIO provides functionality to export biosignal data to EDF (European Data Format) or BDF (BioSemi Data Format) files.

Module Documentation

biosigio.exporters.edf

_CONSTANT_BULK_PERCENTILES = (0.1, 99.9) module-attribute

_PHYS_FIELD_CHARS = 8 module-attribute

EDFExporter

Exporter for EDF format with channels.tsv generation.

Source code in biosigio/exporters/edf.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
class EDFExporter:
    """Exporter for EDF format with channels.tsv generation."""

    @staticmethod
    def export(
        rec: Recording,
        filepath: str,
        precision_threshold: float = 0.01,
        method: str = "both",
        fft_noise_range: tuple | None = None,
        svd_rank: int | None = None,
        format: Literal["auto", "edf", "bdf"] = "auto",
        bypass_analysis: bool = False,
        events_df: pd.DataFrame | None = None,
        create_channels_tsv: bool = True,
        clip_outliers: bool | str = "auto",
        outlier_sigmas: float = 8.0,
        min_effective_bits: float = 10.0,
        **kwargs,
    ) -> str:
        """
        Export EMG data to EDF/BDF format with optional BIDS-compliant channels.tsv file.

        Args:
            rec: Recording object containing the data
            filepath: Path to save the EDF/BDF file
            precision_threshold: Maximum acceptable precision loss percentage (default: 0.01%)
            method: Method for signal analysis ('svd', 'fft', or 'both')
                'svd': Uses Singular Value Decomposition for noise floor estimation
                'fft': Uses Fast Fourier Transform for noise floor estimation
                'both': Uses both methods and takes the minimum noise floor (default)
            fft_noise_range: Optional tuple (min_freq, max_freq) specifying frequency range for noise in FFT method
            svd_rank: Optional manual rank cutoff for signal/noise separation in SVD method
            format: Format to use ('auto', 'edf', or 'bdf'). Default is 'auto'.
                    If 'edf' or 'bdf' is specified, that format will be used directly.
                    If 'auto', the format (EDF/16-bit or BDF/24-bit) is chosen based
                    on signal analysis to minimize precision loss while preferring EDF
                    if sufficient.
            bypass_analysis: If True, skip the signal analysis step. Requires format
                             to be explicitly set to 'edf' or 'bdf'. (default: False)
            events_df: Optional DataFrame containing events/annotations to write.
                     Columns should include 'onset', 'duration', 'description'.
                     If None or empty, no annotations are written.
            create_channels_tsv: If True, create a BIDS-compliant channels.tsv file (default: True)
            clip_outliers: Singularity handling for the per-channel physical window.
                'auto' (default): keep the full data range losslessly, but clip rare
                extreme outliers to a robust percentile window ONLY when keeping them
                would drop the bulk signal below ``min_effective_bits`` of resolution
                at the chosen format (a loud warning reports what was clipped). True:
                always clip to the robust window. False: never clip (full range,
                lossless even if a single outlier starves the bulk of resolution).
            outlier_sigmas: Robust z-score threshold for the inlier window: samples
                within ``outlier_sigmas`` x 1.4826 x median-absolute-deviation of the
                median are inliers; the window is their min/max so only genuine
                singularities are clipped (default 8.0).
            min_effective_bits: Resolution floor (in bits) the bulk signal must keep
                under 'auto'; outliers are clipped only if the full range would push
                it below this (default 10.0, ~60 dB). Kept low so BDF's 24-bit range
                preserves moderate outliers losslessly and only true singularities
                (or forced/low-res EDF) trigger clipping.
            **kwargs: Additional arguments for the exporter
        """
        if rec.signals is None:
            raise ValueError("No signals to export")

        print("\nSignal Analysis:")
        print("--------------")

        # Initialize format decision variables
        use_bdf = False
        bdf_reason = ""

        # --- Format Decision and Bypass Check ---
        if bypass_analysis and format.lower() == "auto":
            raise ValueError("Cannot bypass analysis when format is set to 'auto'.")

        if format.lower() == "bdf":
            use_bdf = True
            if not bypass_analysis:
                print("\nUser specified BDF format (24-bit).")
            else:
                # Log critical only if bypassing, already logged in Recording.to_edf
                pass  # logging.log(logging.CRITICAL, "Skipping analysis, using specified BDF format.")
        elif format.lower() == "edf":
            use_bdf = False
            if not bypass_analysis:
                print("\nUser specified EDF format (16-bit).")
            else:
                # Log critical only if bypassing, already logged in Recording.to_edf
                pass  # logging.log(logging.CRITICAL, "Skipping analysis, using specified EDF format.")
        elif format.lower() != "auto":
            warnings.warn(
                f"Unknown format: {format}. Valid options are 'auto', 'edf', or 'bdf'. Using 'auto'.",
                stacklevel=2,
            )
            format = "auto"  # Default to auto if invalid format given
            bypass_analysis = False  # Cannot bypass if format is auto

        signal_analyses = {}
        signal_info_strings = []

        # --- Conditional Signal Analysis ---
        if not bypass_analysis:
            # Analyze signals (needed for summary and potentially for 'auto' format decision)
            for ch_name in rec.channels:
                signal = rec.signals[ch_name].values
                ch_info = rec.channels[ch_name]

                # Analyze signal characteristics
                analysis = analyze_signal(
                    signal, method=method, fft_noise_range=fft_noise_range, svd_rank=svd_rank
                )
                recommend_bdf, reason, snr = determine_format_suitability(signal, analysis)
                analysis["snr"] = snr
                analysis["recommend_bdf"] = recommend_bdf
                analysis["reason"] = reason
                signal_analyses[ch_name] = analysis  # Store analysis for later summary

                # If format is 'auto', check if any channel recommends BDF
                if format == "auto" and recommend_bdf:
                    use_bdf = True  # Switch to BDF if any channel needs it
                    if not bdf_reason:  # Capture the first reason
                        bdf_reason = f"Channel '{ch_name}': {reason}"

                # Prepare info string for printing later
                signal_info_strings.append(
                    f"\n  {ch_name}:"
                    f"\n    Range: {analysis['range']:.8g} {ch_info['physical_dimension']}"
                    f"\n    Dynamic Range: {analysis['dynamic_range_db']:.1f} dB"
                    f"\n    Noise Floor: {analysis['noise_floor']:.2e} {ch_info['physical_dimension']}"
                    f"\n    SNR: {snr:.1f} dB"
                    f"\n    Method: {analysis.get('method', 'svd')}"
                    f"\n    Recommended Format: {'BDF' if recommend_bdf else 'EDF'} ({reason})"
                )

            # Print analysis details after deciding the format
            for info_str in signal_info_strings:
                print(info_str)

            # Final format decision message for 'auto' mode
            if format == "auto":
                if use_bdf:
                    print(
                        "\nUsing BDF format (24-bit) based on signal analysis to preserve precision."
                    )
                    print(f"Reason: {bdf_reason}")
                    warnings.warn(
                        f"Using BDF format based on signal analysis. Reason: {bdf_reason}",
                        stacklevel=2,
                    )
                else:
                    print(
                        "\nUsing EDF format (16-bit) based on signal analysis (precision within acceptable range)."
                    )
        # else: # bypass_analysis is True - logging handled in Recording.to_edf
        #     pass # logging.log(logging.CRITICAL, "Signal analysis bypassed.")

        # Set file format and create writer
        # Initialize BIDS-compliant channels.tsv data structure
        # Required columns in BIDS order: name, type, units
        channels_tsv_data = {
            "name": [],
            "type": [],
            "units": [],
            "sampling_frequency": [],
            "reference": [],
            "status": [],
        }
        channel_info_list = []

        # EDF/BDF export requires a single sampling rate across channels: biosigio stores
        # all channels on one uniform-length grid, and pyedflib's writeSamples produces
        # an unreadable file when per-channel record counts differ. Fail loudly instead
        # of writing a corrupt file; mixed-rate sources (e.g. Trigno EMG + ACC) must be
        # resampled to a common rate before export.
        distinct_rates = {int(rec.channels[ch]["sample_frequency"]) for ch in rec.channels}
        if len(distinct_rates) > 1:
            raise ValueError(
                "EDF/BDF export requires a single sampling rate across all channels, but "
                f"multiple were found: {sorted(distinct_rates)} Hz. Resample channels to a "
                "common rate before exporting."
            )

        if use_bdf:
            filepath = os.path.splitext(filepath)[0] + ".bdf"
            filetype = pyedflib.FILETYPE_BDFPLUS
        else:
            filepath = os.path.splitext(filepath)[0] + ".edf"
            filetype = pyedflib.FILETYPE_EDFPLUS

        writer = pyedflib.EdfWriter(filepath, len(rec.channels), file_type=filetype)

        try:
            # MEMORY OPTIMIZATION: Two-pass approach to avoid holding all signals in memory
            # Pass 1: Collect headers only (compute min/max without copying data)
            for _i, ch_name in enumerate(rec.channels):
                signal = rec.signals[ch_name].values
                ch_info = rec.channels[ch_name]

                # Resolve the physical window the bounds will bracket. Handle the
                # empty/all-NaN edge case, then choose the window (full range, or a
                # robust window when 'auto'/True clips genuine singularities).
                if signal.size == 0 or np.all(np.isnan(signal)):
                    warnings.warn(
                        f"Channel '{ch_name}' has an empty or all-NaN signal. "
                        "Using default min/max of 0.0 for scaling.",
                        stacklevel=2,
                    )
                    win_lo, win_hi, n_clipped, max_excursion = 0.0, 0.0, 0, 0.0
                else:
                    win_lo, win_hi, n_clipped, max_excursion = _resolve_physical_window(
                        signal, use_bdf, clip_outliers, outlier_sigmas, min_effective_bits
                    )
                    if n_clipped > 0:
                        unit = ch_info["physical_dimension"]
                        warnings.warn(
                            f"Channel '{ch_name}': {n_clipped} outlier sample(s) "
                            f"({100.0 * n_clipped / signal.size:.4f}%) will saturate to the "
                            f"robust window [{win_lo:.6g}, {win_hi:.6g}] {unit}, preserving "
                            f"{24 if use_bdf else 16}-bit resolution for the bulk signal "
                            f"(max excursion {max_excursion:.6g} {unit}). "
                            "Pass clip_outliers=False to keep the full range instead.",
                            stacklevel=2,
                        )

                # Calculate scaling factors for header based on the chosen format (use_bdf).
                # physical_min/max are rounded outward to bracket the window, so pyedflib
                # never silently clips the bulk signal (issue #61).
                # scaling_factor is informational (pyedflib derives its own from the
                # physical/digital ranges); the bounds are what matter for fidelity.
                phys_min, phys_max, dig_min, dig_max, _scaling = _determine_scaling_factors(
                    win_lo, win_hi, use_bdf=use_bdf
                )

                # EDF/BDF store physical_min/max as 8-char ASCII. A magnitude needing
                # more digits (|value| >= 1e8, or >= 1e7 once a sign is added) cannot be
                # represented: pyedflib would truncate it and silently scale the channel
                # by powers of ten (reintroducing the #61 corruption) or abort the write.
                # Fail loudly here instead, before any bytes are written, with the only
                # real remedy - rescale the channel to a coarser unit.
                if len(str(phys_min)) > _PHYS_FIELD_CHARS or len(str(phys_max)) > _PHYS_FIELD_CHARS:
                    raise ValueError(
                        f"Channel '{ch_name}': physical range [{phys_min}, {phys_max}] "
                        f"{ch_info['physical_dimension']} needs more than {_PHYS_FIELD_CHARS} "
                        "characters and cannot be stored in the EDF/BDF header without corrupting "
                        "the values. Rescale this channel to a coarser unit (e.g. uV -> mV -> V) "
                        "so the magnitude fits."
                    )

                # Prepare channel header dictionary
                ch_dict = {
                    "label": ch_name[:16],  # EDF+ limits label to 16 chars
                    "dimension": ch_info["physical_dimension"],
                    "sample_frequency": int(ch_info["sample_frequency"]),
                    "physical_max": phys_max,
                    "physical_min": phys_min,
                    "digital_max": dig_max,
                    "digital_min": dig_min,
                    "prefilter": ch_info["prefilter"],
                    "transducer": f"{ch_info.get('channel_type', 'Unknown')} sensor",
                }
                channel_info_list.append(ch_dict)

                # Add to BIDS-compliant channels.tsv data
                channels_tsv_data["name"].append(ch_name)

                # Channels carry a validated channel_type from the modality
                # vocabulary, so use it directly for channels.tsv. This preserves
                # genuine BIDS types (EEG/SEEG/ECOG/...) instead of flattening
                # everything but a short whitelist to MISC.
                bids_type = to_bids_channels_tsv_type(ch_info.get("channel_type", "OTHER"))
                channels_tsv_data["type"].append(bids_type)
                channels_tsv_data["units"].append(ch_info["physical_dimension"])
                channels_tsv_data["sampling_frequency"].append(ch_info["sample_frequency"])
                channels_tsv_data["reference"].append("n/a")
                channels_tsv_data["status"].append("good")

            # Set all headers before writing
            writer.setSignalHeaders(channel_info_list)

            # Pass 2: Write every data record for all signals at once.
            # pyedflib's writePhysicalSamples() writes exactly ONE data record
            # (sample_frequency samples) per call, so calling it once per channel with
            # the full array silently truncated every export to a single record
            # (one second). writeSamples() emits all records for every signal.
            # IMPORTANT: order must match setSignalHeaders; rec.channels preserves
            # insertion order (Python 3.7+) and both passes iterate it.
            signals_to_write = [
                np.nan_to_num(rec.signals[ch_name].values, nan=0.0).astype(np.float64, copy=False)
                for ch_name in rec.channels
            ]
            writer.writeSamples(signals_to_write)

            # Write annotations if provided
            if events_df is not None and not events_df.empty:
                for _index, row in events_df.iterrows():
                    try:
                        # pyedflib uses onset, duration, description
                        onset = float(row["onset"])
                        duration = float(row["duration"])
                        description = str(row["description"])
                        # Write annotation for all channels (-1)
                        writer.writeAnnotation(onset, duration, description)
                    except KeyError as e:
                        warnings.warn(
                            f"Skipping event due to missing column: {e}. Event data: {row}",
                            stacklevel=2,
                        )
                    except (TypeError, ValueError) as e:
                        warnings.warn(
                            f"Skipping event due to invalid data type: {e}. Event data: {row}",
                            stacklevel=2,
                        )

            # Explicitly flush and close the writer to ensure all data is written
            writer.close()

            # Wait a moment to ensure file system operations are complete
            import time

            time.sleep(0.1)

            # Verify the file exists and has the correct size
            if not os.path.exists(filepath):
                raise OSError(f"File {filepath} was not created")

            file_size = os.path.getsize(filepath)
            if file_size == 0:
                raise OSError(f"File {filepath} was created but is empty")

            # Generate BIDS-compliant channels.tsv file if requested
            if create_channels_tsv:
                channels_tsv_path = os.path.splitext(filepath)[0] + "_channels.tsv"
                # Create DataFrame with columns in BIDS-specified order
                # Required columns first: name, type, units
                # Then optional columns in the order they appear in data
                ordered_columns = ["name", "type", "units"]
                optional_columns = [
                    col for col in channels_tsv_data.keys() if col not in ordered_columns
                ]
                column_order = ordered_columns + optional_columns

                channels_df = pd.DataFrame(channels_tsv_data)
                channels_df = channels_df[column_order]
                channels_df.to_csv(channels_tsv_path, sep="\t", index=False, na_rep="n/a")
                print(f"\nBIDS-compliant channels metadata saved to: {channels_tsv_path}")

            # Print summary using stored analyses, only if analysis was performed
            if not bypass_analysis:
                # We need to adapt summarize_channels call slightly or assume it uses the analyses dict
                # Let's refine the analyses dict passed to summarize_channels
                summary_analyses = {}
                for ch_name, analysis in signal_analyses.items():
                    summary_analyses[ch_name] = {
                        "range": analysis["range"],
                        "dynamic_range_db": analysis["dynamic_range_db"],
                        "snr_db": analysis["snr"],
                        "use_bdf": use_bdf,  # Use the final decision for the whole file
                    }

                summary = summarize_channels(cast(dict, rec.channels), summary_analyses)
                print("\nSummary:")
                print(summary)
            else:
                print("\nSummary skipped as signal analysis was bypassed.")

            print(f"\nEMG data exported to: {filepath}")
            return filepath
        except Exception as e:
            # The writer is closed unconditionally in the finally block below; here
            # we only remove the partially written file so a failed export leaves no
            # corrupt output behind.
            import time

            time.sleep(0.1)

            if "filepath" in locals() and os.path.exists(filepath):
                try:
                    os.unlink(filepath)
                    print(f"Cleaned up partially written file: {filepath}")
                except Exception as unlink_e:
                    print(f"Error during cleanup of {filepath}: {unlink_e}")

            raise e
        finally:
            if writer is not None:
                writer.close()  # Ensure writer is closed

export(rec, filepath, precision_threshold=0.01, method='both', fft_noise_range=None, svd_rank=None, format='auto', bypass_analysis=False, events_df=None, create_channels_tsv=True, clip_outliers='auto', outlier_sigmas=8.0, min_effective_bits=10.0, **kwargs) staticmethod

Export EMG data to EDF/BDF format with optional BIDS-compliant channels.tsv file.

Args: rec: Recording object containing the data filepath: Path to save the EDF/BDF file precision_threshold: Maximum acceptable precision loss percentage (default: 0.01%) method: Method for signal analysis ('svd', 'fft', or 'both') 'svd': Uses Singular Value Decomposition for noise floor estimation 'fft': Uses Fast Fourier Transform for noise floor estimation 'both': Uses both methods and takes the minimum noise floor (default) fft_noise_range: Optional tuple (min_freq, max_freq) specifying frequency range for noise in FFT method svd_rank: Optional manual rank cutoff for signal/noise separation in SVD method format: Format to use ('auto', 'edf', or 'bdf'). Default is 'auto'. If 'edf' or 'bdf' is specified, that format will be used directly. If 'auto', the format (EDF/16-bit or BDF/24-bit) is chosen based on signal analysis to minimize precision loss while preferring EDF if sufficient. bypass_analysis: If True, skip the signal analysis step. Requires format to be explicitly set to 'edf' or 'bdf'. (default: False) events_df: Optional DataFrame containing events/annotations to write. Columns should include 'onset', 'duration', 'description'. If None or empty, no annotations are written. create_channels_tsv: If True, create a BIDS-compliant channels.tsv file (default: True) clip_outliers: Singularity handling for the per-channel physical window. 'auto' (default): keep the full data range losslessly, but clip rare extreme outliers to a robust percentile window ONLY when keeping them would drop the bulk signal below min_effective_bits of resolution at the chosen format (a loud warning reports what was clipped). True: always clip to the robust window. False: never clip (full range, lossless even if a single outlier starves the bulk of resolution). outlier_sigmas: Robust z-score threshold for the inlier window: samples within outlier_sigmas x 1.4826 x median-absolute-deviation of the median are inliers; the window is their min/max so only genuine singularities are clipped (default 8.0). min_effective_bits: Resolution floor (in bits) the bulk signal must keep under 'auto'; outliers are clipped only if the full range would push it below this (default 10.0, ~60 dB). Kept low so BDF's 24-bit range preserves moderate outliers losslessly and only true singularities (or forced/low-res EDF) trigger clipping. **kwargs: Additional arguments for the exporter

Source code in biosigio/exporters/edf.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
@staticmethod
def export(
    rec: Recording,
    filepath: str,
    precision_threshold: float = 0.01,
    method: str = "both",
    fft_noise_range: tuple | None = None,
    svd_rank: int | None = None,
    format: Literal["auto", "edf", "bdf"] = "auto",
    bypass_analysis: bool = False,
    events_df: pd.DataFrame | None = None,
    create_channels_tsv: bool = True,
    clip_outliers: bool | str = "auto",
    outlier_sigmas: float = 8.0,
    min_effective_bits: float = 10.0,
    **kwargs,
) -> str:
    """
    Export EMG data to EDF/BDF format with optional BIDS-compliant channels.tsv file.

    Args:
        rec: Recording object containing the data
        filepath: Path to save the EDF/BDF file
        precision_threshold: Maximum acceptable precision loss percentage (default: 0.01%)
        method: Method for signal analysis ('svd', 'fft', or 'both')
            'svd': Uses Singular Value Decomposition for noise floor estimation
            'fft': Uses Fast Fourier Transform for noise floor estimation
            'both': Uses both methods and takes the minimum noise floor (default)
        fft_noise_range: Optional tuple (min_freq, max_freq) specifying frequency range for noise in FFT method
        svd_rank: Optional manual rank cutoff for signal/noise separation in SVD method
        format: Format to use ('auto', 'edf', or 'bdf'). Default is 'auto'.
                If 'edf' or 'bdf' is specified, that format will be used directly.
                If 'auto', the format (EDF/16-bit or BDF/24-bit) is chosen based
                on signal analysis to minimize precision loss while preferring EDF
                if sufficient.
        bypass_analysis: If True, skip the signal analysis step. Requires format
                         to be explicitly set to 'edf' or 'bdf'. (default: False)
        events_df: Optional DataFrame containing events/annotations to write.
                 Columns should include 'onset', 'duration', 'description'.
                 If None or empty, no annotations are written.
        create_channels_tsv: If True, create a BIDS-compliant channels.tsv file (default: True)
        clip_outliers: Singularity handling for the per-channel physical window.
            'auto' (default): keep the full data range losslessly, but clip rare
            extreme outliers to a robust percentile window ONLY when keeping them
            would drop the bulk signal below ``min_effective_bits`` of resolution
            at the chosen format (a loud warning reports what was clipped). True:
            always clip to the robust window. False: never clip (full range,
            lossless even if a single outlier starves the bulk of resolution).
        outlier_sigmas: Robust z-score threshold for the inlier window: samples
            within ``outlier_sigmas`` x 1.4826 x median-absolute-deviation of the
            median are inliers; the window is their min/max so only genuine
            singularities are clipped (default 8.0).
        min_effective_bits: Resolution floor (in bits) the bulk signal must keep
            under 'auto'; outliers are clipped only if the full range would push
            it below this (default 10.0, ~60 dB). Kept low so BDF's 24-bit range
            preserves moderate outliers losslessly and only true singularities
            (or forced/low-res EDF) trigger clipping.
        **kwargs: Additional arguments for the exporter
    """
    if rec.signals is None:
        raise ValueError("No signals to export")

    print("\nSignal Analysis:")
    print("--------------")

    # Initialize format decision variables
    use_bdf = False
    bdf_reason = ""

    # --- Format Decision and Bypass Check ---
    if bypass_analysis and format.lower() == "auto":
        raise ValueError("Cannot bypass analysis when format is set to 'auto'.")

    if format.lower() == "bdf":
        use_bdf = True
        if not bypass_analysis:
            print("\nUser specified BDF format (24-bit).")
        else:
            # Log critical only if bypassing, already logged in Recording.to_edf
            pass  # logging.log(logging.CRITICAL, "Skipping analysis, using specified BDF format.")
    elif format.lower() == "edf":
        use_bdf = False
        if not bypass_analysis:
            print("\nUser specified EDF format (16-bit).")
        else:
            # Log critical only if bypassing, already logged in Recording.to_edf
            pass  # logging.log(logging.CRITICAL, "Skipping analysis, using specified EDF format.")
    elif format.lower() != "auto":
        warnings.warn(
            f"Unknown format: {format}. Valid options are 'auto', 'edf', or 'bdf'. Using 'auto'.",
            stacklevel=2,
        )
        format = "auto"  # Default to auto if invalid format given
        bypass_analysis = False  # Cannot bypass if format is auto

    signal_analyses = {}
    signal_info_strings = []

    # --- Conditional Signal Analysis ---
    if not bypass_analysis:
        # Analyze signals (needed for summary and potentially for 'auto' format decision)
        for ch_name in rec.channels:
            signal = rec.signals[ch_name].values
            ch_info = rec.channels[ch_name]

            # Analyze signal characteristics
            analysis = analyze_signal(
                signal, method=method, fft_noise_range=fft_noise_range, svd_rank=svd_rank
            )
            recommend_bdf, reason, snr = determine_format_suitability(signal, analysis)
            analysis["snr"] = snr
            analysis["recommend_bdf"] = recommend_bdf
            analysis["reason"] = reason
            signal_analyses[ch_name] = analysis  # Store analysis for later summary

            # If format is 'auto', check if any channel recommends BDF
            if format == "auto" and recommend_bdf:
                use_bdf = True  # Switch to BDF if any channel needs it
                if not bdf_reason:  # Capture the first reason
                    bdf_reason = f"Channel '{ch_name}': {reason}"

            # Prepare info string for printing later
            signal_info_strings.append(
                f"\n  {ch_name}:"
                f"\n    Range: {analysis['range']:.8g} {ch_info['physical_dimension']}"
                f"\n    Dynamic Range: {analysis['dynamic_range_db']:.1f} dB"
                f"\n    Noise Floor: {analysis['noise_floor']:.2e} {ch_info['physical_dimension']}"
                f"\n    SNR: {snr:.1f} dB"
                f"\n    Method: {analysis.get('method', 'svd')}"
                f"\n    Recommended Format: {'BDF' if recommend_bdf else 'EDF'} ({reason})"
            )

        # Print analysis details after deciding the format
        for info_str in signal_info_strings:
            print(info_str)

        # Final format decision message for 'auto' mode
        if format == "auto":
            if use_bdf:
                print(
                    "\nUsing BDF format (24-bit) based on signal analysis to preserve precision."
                )
                print(f"Reason: {bdf_reason}")
                warnings.warn(
                    f"Using BDF format based on signal analysis. Reason: {bdf_reason}",
                    stacklevel=2,
                )
            else:
                print(
                    "\nUsing EDF format (16-bit) based on signal analysis (precision within acceptable range)."
                )
    # else: # bypass_analysis is True - logging handled in Recording.to_edf
    #     pass # logging.log(logging.CRITICAL, "Signal analysis bypassed.")

    # Set file format and create writer
    # Initialize BIDS-compliant channels.tsv data structure
    # Required columns in BIDS order: name, type, units
    channels_tsv_data = {
        "name": [],
        "type": [],
        "units": [],
        "sampling_frequency": [],
        "reference": [],
        "status": [],
    }
    channel_info_list = []

    # EDF/BDF export requires a single sampling rate across channels: biosigio stores
    # all channels on one uniform-length grid, and pyedflib's writeSamples produces
    # an unreadable file when per-channel record counts differ. Fail loudly instead
    # of writing a corrupt file; mixed-rate sources (e.g. Trigno EMG + ACC) must be
    # resampled to a common rate before export.
    distinct_rates = {int(rec.channels[ch]["sample_frequency"]) for ch in rec.channels}
    if len(distinct_rates) > 1:
        raise ValueError(
            "EDF/BDF export requires a single sampling rate across all channels, but "
            f"multiple were found: {sorted(distinct_rates)} Hz. Resample channels to a "
            "common rate before exporting."
        )

    if use_bdf:
        filepath = os.path.splitext(filepath)[0] + ".bdf"
        filetype = pyedflib.FILETYPE_BDFPLUS
    else:
        filepath = os.path.splitext(filepath)[0] + ".edf"
        filetype = pyedflib.FILETYPE_EDFPLUS

    writer = pyedflib.EdfWriter(filepath, len(rec.channels), file_type=filetype)

    try:
        # MEMORY OPTIMIZATION: Two-pass approach to avoid holding all signals in memory
        # Pass 1: Collect headers only (compute min/max without copying data)
        for _i, ch_name in enumerate(rec.channels):
            signal = rec.signals[ch_name].values
            ch_info = rec.channels[ch_name]

            # Resolve the physical window the bounds will bracket. Handle the
            # empty/all-NaN edge case, then choose the window (full range, or a
            # robust window when 'auto'/True clips genuine singularities).
            if signal.size == 0 or np.all(np.isnan(signal)):
                warnings.warn(
                    f"Channel '{ch_name}' has an empty or all-NaN signal. "
                    "Using default min/max of 0.0 for scaling.",
                    stacklevel=2,
                )
                win_lo, win_hi, n_clipped, max_excursion = 0.0, 0.0, 0, 0.0
            else:
                win_lo, win_hi, n_clipped, max_excursion = _resolve_physical_window(
                    signal, use_bdf, clip_outliers, outlier_sigmas, min_effective_bits
                )
                if n_clipped > 0:
                    unit = ch_info["physical_dimension"]
                    warnings.warn(
                        f"Channel '{ch_name}': {n_clipped} outlier sample(s) "
                        f"({100.0 * n_clipped / signal.size:.4f}%) will saturate to the "
                        f"robust window [{win_lo:.6g}, {win_hi:.6g}] {unit}, preserving "
                        f"{24 if use_bdf else 16}-bit resolution for the bulk signal "
                        f"(max excursion {max_excursion:.6g} {unit}). "
                        "Pass clip_outliers=False to keep the full range instead.",
                        stacklevel=2,
                    )

            # Calculate scaling factors for header based on the chosen format (use_bdf).
            # physical_min/max are rounded outward to bracket the window, so pyedflib
            # never silently clips the bulk signal (issue #61).
            # scaling_factor is informational (pyedflib derives its own from the
            # physical/digital ranges); the bounds are what matter for fidelity.
            phys_min, phys_max, dig_min, dig_max, _scaling = _determine_scaling_factors(
                win_lo, win_hi, use_bdf=use_bdf
            )

            # EDF/BDF store physical_min/max as 8-char ASCII. A magnitude needing
            # more digits (|value| >= 1e8, or >= 1e7 once a sign is added) cannot be
            # represented: pyedflib would truncate it and silently scale the channel
            # by powers of ten (reintroducing the #61 corruption) or abort the write.
            # Fail loudly here instead, before any bytes are written, with the only
            # real remedy - rescale the channel to a coarser unit.
            if len(str(phys_min)) > _PHYS_FIELD_CHARS or len(str(phys_max)) > _PHYS_FIELD_CHARS:
                raise ValueError(
                    f"Channel '{ch_name}': physical range [{phys_min}, {phys_max}] "
                    f"{ch_info['physical_dimension']} needs more than {_PHYS_FIELD_CHARS} "
                    "characters and cannot be stored in the EDF/BDF header without corrupting "
                    "the values. Rescale this channel to a coarser unit (e.g. uV -> mV -> V) "
                    "so the magnitude fits."
                )

            # Prepare channel header dictionary
            ch_dict = {
                "label": ch_name[:16],  # EDF+ limits label to 16 chars
                "dimension": ch_info["physical_dimension"],
                "sample_frequency": int(ch_info["sample_frequency"]),
                "physical_max": phys_max,
                "physical_min": phys_min,
                "digital_max": dig_max,
                "digital_min": dig_min,
                "prefilter": ch_info["prefilter"],
                "transducer": f"{ch_info.get('channel_type', 'Unknown')} sensor",
            }
            channel_info_list.append(ch_dict)

            # Add to BIDS-compliant channels.tsv data
            channels_tsv_data["name"].append(ch_name)

            # Channels carry a validated channel_type from the modality
            # vocabulary, so use it directly for channels.tsv. This preserves
            # genuine BIDS types (EEG/SEEG/ECOG/...) instead of flattening
            # everything but a short whitelist to MISC.
            bids_type = to_bids_channels_tsv_type(ch_info.get("channel_type", "OTHER"))
            channels_tsv_data["type"].append(bids_type)
            channels_tsv_data["units"].append(ch_info["physical_dimension"])
            channels_tsv_data["sampling_frequency"].append(ch_info["sample_frequency"])
            channels_tsv_data["reference"].append("n/a")
            channels_tsv_data["status"].append("good")

        # Set all headers before writing
        writer.setSignalHeaders(channel_info_list)

        # Pass 2: Write every data record for all signals at once.
        # pyedflib's writePhysicalSamples() writes exactly ONE data record
        # (sample_frequency samples) per call, so calling it once per channel with
        # the full array silently truncated every export to a single record
        # (one second). writeSamples() emits all records for every signal.
        # IMPORTANT: order must match setSignalHeaders; rec.channels preserves
        # insertion order (Python 3.7+) and both passes iterate it.
        signals_to_write = [
            np.nan_to_num(rec.signals[ch_name].values, nan=0.0).astype(np.float64, copy=False)
            for ch_name in rec.channels
        ]
        writer.writeSamples(signals_to_write)

        # Write annotations if provided
        if events_df is not None and not events_df.empty:
            for _index, row in events_df.iterrows():
                try:
                    # pyedflib uses onset, duration, description
                    onset = float(row["onset"])
                    duration = float(row["duration"])
                    description = str(row["description"])
                    # Write annotation for all channels (-1)
                    writer.writeAnnotation(onset, duration, description)
                except KeyError as e:
                    warnings.warn(
                        f"Skipping event due to missing column: {e}. Event data: {row}",
                        stacklevel=2,
                    )
                except (TypeError, ValueError) as e:
                    warnings.warn(
                        f"Skipping event due to invalid data type: {e}. Event data: {row}",
                        stacklevel=2,
                    )

        # Explicitly flush and close the writer to ensure all data is written
        writer.close()

        # Wait a moment to ensure file system operations are complete
        import time

        time.sleep(0.1)

        # Verify the file exists and has the correct size
        if not os.path.exists(filepath):
            raise OSError(f"File {filepath} was not created")

        file_size = os.path.getsize(filepath)
        if file_size == 0:
            raise OSError(f"File {filepath} was created but is empty")

        # Generate BIDS-compliant channels.tsv file if requested
        if create_channels_tsv:
            channels_tsv_path = os.path.splitext(filepath)[0] + "_channels.tsv"
            # Create DataFrame with columns in BIDS-specified order
            # Required columns first: name, type, units
            # Then optional columns in the order they appear in data
            ordered_columns = ["name", "type", "units"]
            optional_columns = [
                col for col in channels_tsv_data.keys() if col not in ordered_columns
            ]
            column_order = ordered_columns + optional_columns

            channels_df = pd.DataFrame(channels_tsv_data)
            channels_df = channels_df[column_order]
            channels_df.to_csv(channels_tsv_path, sep="\t", index=False, na_rep="n/a")
            print(f"\nBIDS-compliant channels metadata saved to: {channels_tsv_path}")

        # Print summary using stored analyses, only if analysis was performed
        if not bypass_analysis:
            # We need to adapt summarize_channels call slightly or assume it uses the analyses dict
            # Let's refine the analyses dict passed to summarize_channels
            summary_analyses = {}
            for ch_name, analysis in signal_analyses.items():
                summary_analyses[ch_name] = {
                    "range": analysis["range"],
                    "dynamic_range_db": analysis["dynamic_range_db"],
                    "snr_db": analysis["snr"],
                    "use_bdf": use_bdf,  # Use the final decision for the whole file
                }

            summary = summarize_channels(cast(dict, rec.channels), summary_analyses)
            print("\nSummary:")
            print(summary)
        else:
            print("\nSummary skipped as signal analysis was bypassed.")

        print(f"\nEMG data exported to: {filepath}")
        return filepath
    except Exception as e:
        # The writer is closed unconditionally in the finally block below; here
        # we only remove the partially written file so a failed export leaves no
        # corrupt output behind.
        import time

        time.sleep(0.1)

        if "filepath" in locals() and os.path.exists(filepath):
            try:
                os.unlink(filepath)
                print(f"Cleaned up partially written file: {filepath}")
            except Exception as unlink_e:
                print(f"Error during cleanup of {filepath}: {unlink_e}")

        raise e
    finally:
        if writer is not None:
            writer.close()  # Ensure writer is closed

Recording

Core biosignal recording: signals + channels + events + metadata.

Modality-agnostic container for EEG / EMG / iEEG / MEG / stim / marker data imported from any supported format.

Attributes: signals (pd.DataFrame): Raw signal data with time as index. metadata (dict): Metadata dictionary containing recording information. channels (dict): Channel information including type, unit, sampling frequency. events (pd.DataFrame): Annotations or events associated with the signals, with columns 'onset', 'duration', 'description'.

Source code in biosigio/core/emg.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
class Recording:
    """
    Core biosignal recording: signals + channels + events + metadata.

    Modality-agnostic container for EEG / EMG / iEEG / MEG / stim / marker data
    imported from any supported format.

    Attributes:
        signals (pd.DataFrame): Raw signal data with time as index.
        metadata (dict): Metadata dictionary containing recording information.
        channels (dict): Channel information including type, unit, sampling frequency.
        events (pd.DataFrame): Annotations or events associated with the signals,
                               with columns 'onset', 'duration', 'description'.
    """

    def __init__(self):
        """Initialize an empty recording."""
        self.signals = None
        self.metadata = {}
        self.channels = {}
        # Initialize events as an empty DataFrame with specified columns
        self.events = pd.DataFrame(columns=["onset", "duration", "description"])

    def plot_signals(
        self,
        channels=None,
        time_range=None,
        offset_scale=0.8,
        uniform_scale=True,
        detrend=False,
        grid=True,
        title=None,
        show=True,
        plt_module=None,
    ):
        """
        Plot signals in a single plot with vertical offsets.

        Args:
            channels: List of channels to plot. If None, plot all channels.
            time_range: Tuple of (start_time, end_time) to plot. If None, plot all data.
            offset_scale: Portion of allocated space each signal can use (0.0 to 1.0).
            uniform_scale: Whether to use the same scale for all signals.
            detrend: Whether to remove mean from signals before plotting.
            grid: Whether to show grid lines.
            title: Optional title for the figure.
            show: Whether to display the plot.
            plt_module: Matplotlib pyplot module to use.
        """
        # Delegate to the static plotting function in visualization module
        static_plot_signals(
            rec_object=self,
            channels=channels,
            time_range=time_range,
            offset_scale=offset_scale,
            uniform_scale=uniform_scale,
            detrend=detrend,
            grid=grid,
            title=title,
            show=show,
            plt_module=plt_module,
        )

    @classmethod
    def _infer_importer(cls, filepath: str) -> ImporterName:
        """
        Infer the importer to use based on the file extension.
        """
        # rstrip path separators so a Zarr store passed as a directory with a
        # trailing slash (e.g. "rec.zarr/") still resolves by its ".zarr" suffix.
        extension = os.path.splitext(filepath.rstrip("/\\"))[1].lower()
        if extension in {".edf", ".bdf"}:
            return "edf"
        elif extension in {".set"}:
            return "eeglab"
        elif extension in {".otb", ".otb+"}:
            return "otb"
        elif extension in {".csv", ".txt"}:
            return "csv"
        elif extension in {".hea", ".dat", ".atr"}:
            return "wfdb"
        elif extension in {".xdf", ".xdfz"}:
            return "xdf"
        elif extension in {".fif", ".ds"}:
            return "meg"
        elif extension in {".vhdr"}:
            return "brainvision"
        elif extension in {".parquet", ".feather", ".arrow"}:
            return "tabular"
        elif extension in {
            ".rhd",
            ".rhs",
            ".ns1",
            ".ns2",
            ".ns3",
            ".ns4",
            ".ns5",
            ".ns6",
            ".smr",
            ".smrx",
            ".plx",
            ".pl2",
            ".trc",
            ".ncs",
        }:
            return "neo"
        elif extension == ".zarr":
            return "zarr"
        else:
            raise ValueError(f"Unsupported file extension: {extension}")

    @classmethod
    def from_file(
        cls,
        filepath: str,
        importer: ImporterName | None = None,
        force_csv: bool = False,
        bids_channels: str = "auto",
        **kwargs,
    ) -> "Recording":
        """
        The method to create a Recording object from file.

        Args:
            filepath: Path to the input file
            importer: Name of the importer to use. Can be one of the following:
                - 'trigno': Delsys Trigno EMG system (CSV)
                - 'otb': OTB/OTB+ EMG system (OTB, OTB+)
                - 'eeglab': EEGLAB .set files (SET)
                - 'edf': EDF/EDF+/BDF/BDF+ format (EDF, BDF)
                - 'csv': Generic CSV (or TXT) files with columnar data
                - 'wfdb': Waveform Database (WFDB)
                - 'xdf': XDF format (multi-stream Lab Streaming Layer files)
                - 'meg': MEG via MNE (.fif and CTF .ds; requires the 'meg' extra)
                - 'brainvision': BrainVision .vhdr via MNE (requires the 'meg' extra)
                - 'tabular': biosigIO Parquet/Arrow/Feather (requires the 'arrow' extra)
                - 'neo': proprietary electrophysiology formats via python-neo
                  (Intan, Blackrock, Spike2, Plexon, Micromed, Neuralynx, ...;
                  requires the 'neo' extra)
                - 'zarr': biosigIO Zarr serving store (requires the 'zarr' extra)
                If None, the importer will be inferred from the file extension.
                Automatic import is supported for CSV/TXT files.
            force_csv: If True and importer is 'csv', forces using the generic CSV
                      importer even if the file appears to match a specialized format.
            bids_channels: When 'auto' (default), look for a sibling BIDS
                      _channels.tsv next to the file and apply its per-channel
                      type/units over the importer's inferred values. Pass 'off'
                      to disable.
            **kwargs: Additional arguments passed to the importer.
                For XDF files, useful kwargs include:
                - stream_names: List of stream names to import
                - stream_types: List of stream types to import (e.g., ["EMG", "EXG"])
                - stream_ids: List of stream IDs to import

        Returns:
            Recording: New Recording object with loaded data
        """
        if importer is None:
            importer = cls._infer_importer(filepath)

        importers = {
            "trigno": "TrignoImporter",  # CSV with Delsys Trigno Headers
            "otb": "OTBImporter",  # OTB/OTB+ EMG system data
            "edf": "EDFImporter",  # EDF/EDF+/BDF format
            "eeglab": "EEGLABImporter",  # EEGLAB .set files
            "csv": "CSVImporter",  # Generic CSV/Text files
            "wfdb": "WFDBImporter",  # Waveform Database format
            "xdf": "XDFImporter",  # XDF multi-stream format
            "meg": "MEGImporter",  # MEG via MNE (.fif, CTF .ds)
            "brainvision": "BrainVisionImporter",  # BrainVision via MNE (.vhdr)
            "tabular": "TabularImporter",  # biosigIO Parquet / Arrow / Feather
            "neo": "NeoImporter",  # proprietary ephys via python-neo
            "zarr": "ZarrImporter",  # biosigIO Zarr serving store
        }

        if importer not in importers:
            raise ValueError(
                f"Unsupported importer: {importer}. "
                f"Available importers: {list(importers.keys())}\n"
                "- trigno: Delsys Trigno EMG system\n"
                "- otb: OTB/OTB+ EMG system\n"
                "- edf: EDF/EDF+/BDF format\n"
                "- eeglab: EEGLAB .set files\n"
                "- csv: Generic CSV/Text files\n"
                "- wfdb: Waveform Database\n"
                "- xdf: XDF multi-stream format\n"
                "- meg: MEG via MNE (.fif, CTF .ds)\n"
                "- brainvision: BrainVision via MNE (.vhdr)\n"
                "- tabular: biosigIO Parquet/Arrow/Feather (.parquet, .feather, .arrow)\n"
                "- neo: proprietary electrophysiology formats via python-neo "
                "(Intan, Blackrock, Spike2, Plexon, Micromed, Neuralynx, ...)\n"
                "- zarr: biosigIO Zarr serving store (.zarr)"
            )

        # If using CSV importer and force_csv is set, pass it as force_generic
        if importer == "csv":
            kwargs["force_generic"] = force_csv

        # Import the appropriate importer class
        importer_module = __import__(
            f"biosigio.importers.{importer}", globals(), locals(), [importers[importer]]
        )
        importer_class = getattr(importer_module, importers[importer])

        # Create importer instance and load data
        rec = importer_class().load(filepath, **kwargs)

        # Record provenance: which format this recording came from. setdefault so a
        # re-imported serialization file (tabular/zarr) keeps the ORIGINAL
        # source_format restored from its metadata rather than being relabeled.
        rec.metadata.setdefault("source_format", importer)

        # In a BIDS layout, the sibling _channels.tsv is the authoritative source
        # of per-channel type/units; apply it over the importer's header/label
        # guesses unless explicitly disabled with bids_channels="off".
        if bids_channels != "off":
            from ..bids import apply_channels_tsv, find_channels_tsv

            channels_tsv = find_channels_tsv(filepath)
            if channels_tsv:
                apply_channels_tsv(rec, channels_tsv)

        return rec

    def select_channels(
        self,
        channels: str | list[str] | None = None,
        channel_type: str | None = None,
        inplace: bool = False,
        *,
        modality: str | None = None,
    ) -> "Recording":
        """
        Select specific channels from the data and return a new Recording object.

        Args:
            channels: Channel name or list of channel names to select. If None and
                    channel_type is specified, selects all channels of that type.
            channel_type: Type of channels to select ('EMG', 'ACC', 'GYRO', etc.).
                        If specified with channels, filters the selection to only
                        channels of this type.

        Returns:
            Recording: A new Recording object containing only the selected channels

        Examples:
            # Select specific channels
            new_rec = rec.select_channels(['EMG1', 'ACC1'])

            # Select all EMG channels
            emg_only = rec.select_channels(channel_type='EMG')

            # Select specific EMG channels only, this example does not select ACC channels
            emg_subset = rec.select_channels(['EMG1', 'ACC1'], channel_type='EMG')
        """
        if self.signals is None:
            raise ValueError("No signals loaded")

        if channels is None and channel_type is None and modality is None:
            raise ValueError("Specify at least one of: channels, channel_type, or modality.")

        # If type/modality specified but no channels, select all matching channels
        if channels is None and channel_type is not None:
            channels = self.get_channels_by_type(channel_type)
            if not channels:
                raise ValueError(f"No channels found of type: {channel_type}")
        elif channels is None and modality is not None:
            channels = self.get_channels_by_modality(modality)
            if not channels:
                raise ValueError(f"No channels found of modality: {modality}")
        elif isinstance(channels, str):
            channels = [channels]

        if channels is None:
            raise ValueError("Specify at least one of: channels, channel_type, or modality.")

        # Validate channels exist
        if not all(ch in self.signals.columns for ch in channels):
            missing = [ch for ch in channels if ch not in self.signals.columns]
            raise ValueError(f"Channels not found: {missing}")

        # Filter by type if specified
        if channel_type is not None:
            channels = [ch for ch in channels if self.channels[ch]["channel_type"] == channel_type]
            if not channels:
                raise ValueError(f"None of the selected channels are of type: {channel_type}")

        # Filter by modality if specified
        if modality is not None:
            canonical_modality = validate_modality(modality)
            channels = [
                ch for ch in channels if self.channels[ch].get("modality") == canonical_modality
            ]
            if not channels:
                raise ValueError(f"None of the selected channels are of modality: {modality}")

        # Create new Recording object
        new_rec = Recording()

        # Copy selected signals and channels
        new_rec.signals = self.signals[channels].copy()
        new_rec.channels = {ch: self.channels[ch].copy() for ch in channels}

        # Copy metadata
        new_rec.metadata = self.metadata.copy()

        if not inplace:
            return new_rec
        else:
            self.signals = new_rec.signals
            self.channels = new_rec.channels
            self.metadata = new_rec.metadata
            return self

    def resample(self, target_rate: float) -> "Recording":
        """Return a NEW, anti-aliased down-sampled copy of this recording.

        Low-resolution demos need a smaller, lighter recording; this rebuilds the
        uniform signal grid at ``target_rate`` using a polyphase resampler
        (``scipy.signal.resample_poly``), which applies a Kaiser-windowed sinc
        anti-alias FIR before decimation. A naive stride-decimation would fold
        energy above the new Nyquist back into the band (aliasing); resample_poly
        removes that energy first, so no aliasing occurs.

        Non-destructive: ``self`` is left untouched and a new Recording is returned,
        mirroring ``select_channels``'s copy semantics.

        Resampling factors come from the integer source/target rates:
        ``g = gcd(int(src), int(target)); up = int(target)//g; down = int(src)//g``
        and ``resample_poly(x, up, down)`` runs once, vectorized over all channels
        along ``axis=0``.

        Args:
            target_rate: Desired sampling rate in Hz. Must be <= the source rate
                (this is a DOWN-sampling helper). A target equal to the source
                returns an unchanged copy; a target above it raises ``ValueError``
                rather than silently up-sampling (up-sampling cannot recover
                detail and is out of scope for the low-res pipeline).

        Returns:
            Recording: A new Recording with the resampled signals, each channel's
                ``sample_frequency`` set to the achieved rate (source * up / down,
                which equals ``target_rate`` for integer rates), and channel/recording
                metadata and events preserved. Events are unchanged because their
                onsets/durations are in SECONDS, which stay valid under any rate
                change (only the per-sample grid shrinks, not wall-clock time).

        Raises:
            ValueError: If no signals are loaded, if channels do not share a single
                ``sample_frequency`` (biosigio stores one uniform grid; mixed-rate
                resampling is out of scope), or if ``target_rate`` exceeds the
                source rate.
        """
        from math import gcd

        from scipy.signal import resample_poly

        if self.signals is None:
            raise ValueError("No signals loaded")

        if target_rate <= 0:
            raise ValueError(f"target_rate must be positive, got {target_rate}")

        # biosigio stores all channels on one uniform-length grid; a per-channel rate
        # mix is out of scope here, matching the exporter's single-rate guard.
        distinct_rates = {info["sample_frequency"] for info in self.channels.values()}
        if len(distinct_rates) > 1:
            raise ValueError(
                "Resampling requires a single sampling rate across all channels, but "
                f"multiple were found: {sorted(distinct_rates)} Hz. biosigio stores one "
                "uniform grid; resample each rate group separately."
            )

        source_rate = float(next(iter(distinct_rates)))

        # Down-sampling only: refuse to up-sample/alias; an equal rate is a no-op
        # copy so callers can resample unconditionally without special-casing.
        if target_rate > source_rate:
            raise ValueError(
                f"target_rate {target_rate} Hz exceeds source rate {source_rate} Hz; "
                "resample() only down-samples (low-res). Up-sampling is out of scope."
            )

        new_rec = Recording()
        new_rec.channels = {ch: info.copy() for ch, info in self.channels.items()}
        new_rec.metadata = self.metadata.copy()
        # Onsets/durations are in SECONDS, so they remain valid after the grid
        # changes; copy them through unchanged.
        new_rec.events = self.events.copy() if self.events is not None else self.events

        if target_rate == source_rate:
            # No grid change: copy signals through untouched (fresh RangeIndex for
            # consistency with the resampled path).
            new_rec.signals = self.signals.copy().reset_index(drop=True)
            return new_rec

        # Rational resampling factors from the integer rates.
        src_i = int(round(source_rate))
        tgt_i = int(round(target_rate))
        g = gcd(src_i, tgt_i)
        up = tgt_i // g
        down = src_i // g

        # The achieved rate is exactly source * up / down. Store THAT, not the
        # requested float, so the metadata can never disagree with the data (a
        # non-integer or odd target snaps to the nearest achievable rational rate;
        # warn so the caller knows). This avoids silently writing e.g. 99.5 Hz
        # onto a grid that resample_poly actually produced at 100 Hz.
        actual_rate = source_rate * up / down
        if abs(actual_rate - target_rate) > 1e-9:
            logging.warning(
                "Requested resample to %g Hz; nearest achievable rational rate is "
                "%g Hz, which is what is stored on the channels.",
                target_rate,
                actual_rate,
            )

        columns = list(self.signals.columns)
        data = self.signals.to_numpy(dtype=float)
        # resample_poly over axis=0 resamples every channel column at once with the
        # shared anti-alias FIR.
        resampled = resample_poly(data, up, down, axis=0)

        new_rec.signals = pd.DataFrame(resampled, columns=columns)
        new_rec.signals.index = pd.RangeIndex(len(new_rec.signals))
        for info in new_rec.channels.values():
            info["sample_frequency"] = actual_rate

        return new_rec

    def get_channel_types(self) -> list[str]:
        """
        Get list of unique channel types in the data.

        Returns:
            List of channel types (e.g., ['EMG', 'ACC', 'GYRO'])
        """
        return list({info["channel_type"] for info in self.channels.values()})

    def get_channels_by_type(self, channel_type: str) -> list[str]:
        """
        Get list of channels of a specific type.

        Args:
            channel_type: Type of channels to get ('EMG', 'ACC', 'GYRO', etc.)

        Returns:
            List of channel names of the specified type
        """
        return [ch for ch, info in self.channels.items() if info["channel_type"] == channel_type]

    def get_modalities(self) -> list[str]:
        """
        Get the list of unique modalities present in the data.

        Returns:
            List of modalities (e.g., ['EEG', 'EMG', 'MISC']).
        """
        return list(
            {info.get("modality") for info in self.channels.values() if info.get("modality")}
        )

    def get_channels_by_modality(self, modality: str) -> list[str]:
        """
        Get the channels belonging to a given modality.

        Args:
            modality: Modality to filter by ('EEG', 'EMG', 'IEEG', 'MEG', 'BEH', 'MISC').

        Returns:
            List of channel names of the specified modality.
        """
        canonical_modality = validate_modality(modality)
        return [
            ch for ch, info in self.channels.items() if info.get("modality") == canonical_modality
        ]

    def to_edf(
        self,
        filepath: str,
        method: str = "both",
        fft_noise_range: tuple | None = None,
        svd_rank: int | None = None,
        precision_threshold: float = 0.01,
        format: Literal["auto", "edf", "bdf"] = "auto",
        bypass_analysis: bool | None = None,
        verify: bool = False,
        verify_tolerance: float = 1e-6,
        verify_channel_map: dict[str, str] | None = None,
        verify_plot: bool = False,
        events_df: pd.DataFrame | None = None,
        create_channels_tsv: bool = True,
        clip_outliers: bool | str = "auto",
        **kwargs,
    ) -> dict | None:
        """
        Export the recording to EDF/BDF format, optionally including events.

        Args:
            filepath: Path to save the EDF/BDF file
            method: Method for signal analysis ('svd', 'fft', or 'both')
                'svd': Uses Singular Value Decomposition for noise floor estimation
                'fft': Uses Fast Fourier Transform for noise floor estimation
                'both': Uses both methods and takes the minimum noise floor (default)
            fft_noise_range: Optional tuple (min_freq, max_freq) specifying frequency range for noise in FFT method
            svd_rank: Optional manual rank cutoff for signal/noise separation in SVD method
            precision_threshold: Maximum acceptable precision loss percentage (default: 0.01%)
            format: Format to use ('auto', 'edf', or 'bdf'). Default is 'auto'.
                    If 'edf' or 'bdf' is specified, that format will be used directly.
                    If 'auto', the format (EDF/16-bit or BDF/24-bit) is chosen based
                    on signal analysis to minimize precision loss while preferring EDF
                    if sufficient.
            bypass_analysis: If True, skip signal analysis step when format is explicitly
                             set to 'edf' or 'bdf'. If None (default), analysis is skipped
                             automatically when format is forced. Set to False to force
                             analysis even with a specified format. Ignored if format='auto'.
            verify: If True, reload the exported file and compare signals with the original
                    to check for data integrity loss. Results are printed. (default: False)
            verify_tolerance: Absolute tolerance used when comparing signals during verification. (default: 1e-6)
            verify_channel_map: Optional dictionary mapping original channel names (keys)
                                to reloaded channel names (values) for verification.
                                Used if `verify` is True and channel names might differ.
            verify_plot: If True and verify is True, plots a comparison of original vs reloaded signals.
            events_df: Optional DataFrame with events ('onset', 'duration', 'description').
                      If None, uses self.events. (This provides flexibility)
            create_channels_tsv: If True, create a BIDS-compliant channels.tsv file (default: True)
            clip_outliers: Singularity handling for the per-channel physical window.
                'auto' (default) keeps the full range losslessly but clips rare extreme
                outliers to a robust window only when keeping them would crater the bulk
                signal's resolution at the chosen format (with a warning); True always
                clips to the robust window; False never clips. See EDFExporter.export for
                the advanced ``outlier_sigmas`` / ``min_effective_bits`` knobs.
            **kwargs: Additional arguments for the EDF exporter

        Returns:
            Union[str, None]: If verify is True, returns a string with verification results.
                             Otherwise, returns None.

        Raises:
            ValueError: If no signals are loaded
        """
        from ..exporters.edf import EDFExporter  # Local import

        if self.signals is None:
            raise ValueError("No signals loaded")

        # --- Determine if analysis should be bypassed ---
        final_bypass_analysis = False
        if format.lower() == "auto":
            if bypass_analysis is True:
                logging.warning(
                    "bypass_analysis=True ignored because format='auto'. Analysis is required."
                )
            # Analysis is always needed for 'auto' format
            final_bypass_analysis = False
        elif format.lower() in ["edf", "bdf"]:
            if bypass_analysis is None:
                # Default behaviour: skip analysis if format is forced
                final_bypass_analysis = True
                msg = (
                    f"Format forced to '{format}'. Skipping signal analysis for faster export. "
                    "Set bypass_analysis=False to force analysis."
                )
                logging.log(logging.CRITICAL, msg)
            elif bypass_analysis is True:
                final_bypass_analysis = True
                logging.log(logging.CRITICAL, "bypass_analysis=True set. Skipping signal analysis.")
            else:  # bypass_analysis is False
                final_bypass_analysis = False
                logging.info(
                    f"Format forced to '{format}' but bypass_analysis=False. Performing signal analysis."
                )
        else:
            # Should not happen if Literal type hint works, but good practice
            logging.warning(
                f"Unknown format '{format}'. Defaulting to 'auto' behavior (analysis enabled)."
            )
            format = "auto"
            final_bypass_analysis = False

        # Determine which events DataFrame to use
        if events_df is None:
            events_to_export = self.events
        else:
            events_to_export = events_df

        # Combine parameters
        all_params: dict[str, Any] = {
            "precision_threshold": precision_threshold,
            "method": method,
            "fft_noise_range": fft_noise_range,
            "svd_rank": svd_rank,
            "format": format,
            "bypass_analysis": final_bypass_analysis,
            "events_df": events_to_export,  # Pass the events dataframe
            "create_channels_tsv": create_channels_tsv,
            "clip_outliers": clip_outliers,
            **kwargs,
        }

        EDFExporter.export(self, filepath, **all_params)

        verification_report_dict = None
        if verify:
            logging.info(f"Verification requested. Reloading exported file: {filepath}")
            try:
                # Reload the exported file
                reloaded_rec = Recording.from_file(filepath, importer="edf")

                logging.info("Comparing original signals with reloaded signals...")
                # Compare signals using the imported function
                verification_results = compare_signals(
                    self, reloaded_rec, tolerance=verify_tolerance, channel_map=verify_channel_map
                )

                # Generate and log report using the imported function
                report_verification_results(verification_results, verify_tolerance)
                verification_report_dict = verification_results

                # Plot comparison using imported function if requested
                summary = verification_results.get("channel_summary", {})
                comparison_mode = summary.get("comparison_mode", "unknown")
                compared_count = sum(1 for k in verification_results if k != "channel_summary")

                if verify_plot and compared_count > 0 and comparison_mode != "failed":
                    plot_comparison(self, reloaded_rec, channel_map=verify_channel_map)
                elif verify_plot:
                    logging.warning(
                        "Skipping verification plot: No channels were successfully compared."
                    )

            except Exception as e:
                logging.error(f"Verification failed during reload or comparison: {e}")
                verification_report_dict = {
                    "error": str(e),
                    "channel_summary": {"comparison_mode": "failed"},
                }

        return verification_report_dict

    def to_parquet(self, filepath: str) -> str:
        """Export to a self-describing biosigIO Parquet file.

        Signals are stored as a columnar table (channels = columns, time index
        preserved); channels/events/metadata travel in the file's schema metadata,
        so ``Recording.from_file`` round-trips it losslessly. Great for analytics
        (DuckDB/Polars/pandas/Spark). Requires the ``arrow`` extra (pyarrow).

        Args:
            filepath: Output ``.parquet`` path.

        Returns:
            str: The written file path.
        """
        from ..exporters.tabular import TabularExporter

        return TabularExporter.to_parquet(self, filepath)

    def to_arrow(self, filepath: str) -> str:
        """Export to a biosigIO Arrow/Feather file (fast zero-copy IPC).

        Same self-describing schema as :meth:`to_parquet`; round-trips via
        ``Recording.from_file``. Requires the ``arrow`` extra (pyarrow).

        Args:
            filepath: Output ``.feather`` / ``.arrow`` path.

        Returns:
            str: The written file path.
        """
        from ..exporters.tabular import TabularExporter

        return TabularExporter.to_arrow(self, filepath)

    def to_zarr(self, filepath: str, **kwargs) -> str:
        """Export to a sharded Zarr v3 serving store with a min/max view pyramid.

        Writes one cloud-native store that serves viewing, inference, and training
        from a single conversion: ``level 0`` of each ``(modality, rate)`` group is
        the anti-aliased, per-modality-resampled inference signal, with a min/max
        render pyramid above it (flagged not-for-inference). A derived serving copy,
        not the archival source (BIDS/EDF stay authoritative). Requires the ``zarr``
        extra (zarr v3). See :class:`~biosigio.exporters.zarr.ZarrExporter` for the
        tuning knobs (``modality_rates``, ``dtype``, chunk/shard sizing, ...).

        Args:
            filepath: Output store path (``.zarr`` appended if missing).
            **kwargs: Forwarded to :meth:`ZarrExporter.export`.

        Returns:
            str: The written store path.
        """
        from ..exporters.zarr import ZarrExporter

        # The empty-signal guard lives once, in ZarrExporter.export ("No signals
        # loaded"), matching the tabular path; no duplicate guard here.
        return ZarrExporter.export(self, filepath, **kwargs)

    def set_metadata(self, key: str, value: Any) -> None:
        """
        Set metadata value.

        Args:
            key: Metadata key
            value: Metadata value
        """
        self.metadata[key] = value

    def get_metadata(self, key: str) -> Any:
        """
        Get metadata value.

        Args:
            key: Metadata key

        Returns:
            Value associated with the key
        """
        return self.metadata.get(key)

    def has_metadata(self, key: str) -> bool:
        """Return True if ``key`` is present in the recording metadata."""
        return key in self.metadata

    def get_n_channels(self) -> int:
        """Number of channels in the recording."""
        return len(self.channels)

    def get_n_samples(self) -> int:
        """Number of time samples per channel (0 if no signals are loaded)."""
        return 0 if self.signals is None else len(self.signals)

    def get_sampling_frequency(self) -> float:
        """Sampling frequency in Hz, when all channels share a single rate.

        Raises:
            ValueError: if no channels are loaded, or channels have differing
                sampling frequencies; for a mixed-rate recording read
                ``channels[ch]["sample_frequency"]`` per channel instead.
        """
        if not self.channels:
            raise ValueError("No channels loaded")
        rates = {info["sample_frequency"] for info in self.channels.values()}
        if len(rates) > 1:
            raise ValueError(
                "Channels have differing sampling frequencies; read "
                "channels[ch]['sample_frequency'] per channel instead."
            )
        return float(next(iter(rates)))

    def get_duration(self) -> float:
        """Total recording duration in seconds (n_samples / sampling_frequency).

        Computed from the time index spacing, so it is the full window length
        (one sample period longer than the last sample's timestamp). Returns 0.0
        when fewer than two samples are loaded (a single sample has no inferable
        sample period).
        """
        if self.signals is None or len(self.signals) < 2:
            return 0.0
        index = self.signals.index
        sample_period = float(index[1] - index[0])
        return len(index) * sample_period

    def add_channel(
        self,
        label: str,
        data: np.ndarray,
        sample_frequency: float,
        physical_dimension: str,
        channel_type: str,
        *,
        modality: str | None = None,
        prefilter: str = "n/a",
    ) -> None:
        """
        Add a new channel to the recording.

        Args:
            label: Channel label or name (as per EDF specification)
            data: Channel data
            sample_frequency: Sampling frequency in Hz (as per EDF specification)
            physical_dimension: Physical dimension/unit of measurement (as per EDF specification)
            channel_type: BIDS channel type ('EEG', 'EMG', 'ECG', 'ACC', 'SEEG', ...).
                Required; validated against the modality vocabulary. There is no
                default (a missing type must be explicit, e.g. 'OTHER'/'MISC').
            modality: Coarse modality ('EEG', 'EMG', 'IEEG', 'MEG', 'BEH', 'MISC').
                If None, it is inferred from ``channel_type``.
            prefilter: Pre-filtering applied to the channel (keyword-only).
        """
        canonical_type = validate_channel_type(channel_type)
        canonical_modality = (
            infer_modality_from_channel_type(canonical_type)
            if modality is None
            else validate_modality(modality)
        )

        if self.signals is None:
            # Create DataFrame with time index
            time = np.arange(len(data)) / sample_frequency
            self.signals = pd.DataFrame(index=time)

        self.signals[label] = data
        self.channels[label] = {
            "sample_frequency": sample_frequency,
            "physical_dimension": physical_dimension,
            "prefilter": prefilter,
            "channel_type": canonical_type,
            "modality": canonical_modality,
        }

    def set_channel(
        self,
        label: str,
        *,
        channel_type: str | None = None,
        modality: str | None = None,
        physical_dimension: str | None = None,
        prefilter: str | None = None,
    ) -> None:
        """
        Update metadata of an existing channel (the supported relabel path).

        Args:
            label: Existing channel label.
            channel_type: New BIDS channel type (validated). When given without an
                explicit ``modality``, the modality is re-derived from it.
            modality: New coarse modality (validated).
            physical_dimension: New physical unit.
            prefilter: New prefilter string.

        Raises:
            KeyError: If ``label`` is not an existing channel.
            ValueError: If ``channel_type`` or ``modality`` is not in the
                modality vocabulary.
        """
        if label not in self.channels:
            raise KeyError(f"Channel not found: {label}")
        info = self.channels[label]
        if channel_type is not None:
            info["channel_type"] = validate_channel_type(channel_type)
            if modality is None:
                info["modality"] = infer_modality_from_channel_type(info["channel_type"])
        if modality is not None:
            info["modality"] = validate_modality(modality)
        if physical_dimension is not None:
            info["physical_dimension"] = physical_dimension
        if prefilter is not None:
            info["prefilter"] = prefilter

    def add_event(self, onset: float, duration: float, description: str) -> None:
        """
        Add an event/annotation to the recording.

        Args:
            onset: Event onset time in seconds.
            duration: Event duration in seconds.
            description: Event description string.
        """
        new_event = pd.DataFrame(
            [{"onset": float(onset), "duration": float(duration), "description": description}]
        )
        # Avoid concatenating onto the empty, object-dtype events frame, which
        # would coerce the numeric columns to object. Start from the typed
        # new_event when there are no existing events.
        if self.events is None or self.events.empty:
            self.events = new_event
        else:
            self.events = pd.concat([self.events, new_event], ignore_index=True)
        # Sort events by onset time for consistency
        self.events = self.events.sort_values(by="onset").reset_index(drop=True)

__init__()

Initialize an empty recording.

Source code in biosigio/core/emg.py
def __init__(self):
    """Initialize an empty recording."""
    self.signals = None
    self.metadata = {}
    self.channels = {}
    # Initialize events as an empty DataFrame with specified columns
    self.events = pd.DataFrame(columns=["onset", "duration", "description"])

add_channel(label, data, sample_frequency, physical_dimension, channel_type, *, modality=None, prefilter='n/a')

Add a new channel to the recording.

Args: label: Channel label or name (as per EDF specification) data: Channel data sample_frequency: Sampling frequency in Hz (as per EDF specification) physical_dimension: Physical dimension/unit of measurement (as per EDF specification) channel_type: BIDS channel type ('EEG', 'EMG', 'ECG', 'ACC', 'SEEG', ...). Required; validated against the modality vocabulary. There is no default (a missing type must be explicit, e.g. 'OTHER'/'MISC'). modality: Coarse modality ('EEG', 'EMG', 'IEEG', 'MEG', 'BEH', 'MISC'). If None, it is inferred from channel_type. prefilter: Pre-filtering applied to the channel (keyword-only).

Source code in biosigio/core/emg.py
def add_channel(
    self,
    label: str,
    data: np.ndarray,
    sample_frequency: float,
    physical_dimension: str,
    channel_type: str,
    *,
    modality: str | None = None,
    prefilter: str = "n/a",
) -> None:
    """
    Add a new channel to the recording.

    Args:
        label: Channel label or name (as per EDF specification)
        data: Channel data
        sample_frequency: Sampling frequency in Hz (as per EDF specification)
        physical_dimension: Physical dimension/unit of measurement (as per EDF specification)
        channel_type: BIDS channel type ('EEG', 'EMG', 'ECG', 'ACC', 'SEEG', ...).
            Required; validated against the modality vocabulary. There is no
            default (a missing type must be explicit, e.g. 'OTHER'/'MISC').
        modality: Coarse modality ('EEG', 'EMG', 'IEEG', 'MEG', 'BEH', 'MISC').
            If None, it is inferred from ``channel_type``.
        prefilter: Pre-filtering applied to the channel (keyword-only).
    """
    canonical_type = validate_channel_type(channel_type)
    canonical_modality = (
        infer_modality_from_channel_type(canonical_type)
        if modality is None
        else validate_modality(modality)
    )

    if self.signals is None:
        # Create DataFrame with time index
        time = np.arange(len(data)) / sample_frequency
        self.signals = pd.DataFrame(index=time)

    self.signals[label] = data
    self.channels[label] = {
        "sample_frequency": sample_frequency,
        "physical_dimension": physical_dimension,
        "prefilter": prefilter,
        "channel_type": canonical_type,
        "modality": canonical_modality,
    }

add_event(onset, duration, description)

Add an event/annotation to the recording.

Args: onset: Event onset time in seconds. duration: Event duration in seconds. description: Event description string.

Source code in biosigio/core/emg.py
def add_event(self, onset: float, duration: float, description: str) -> None:
    """
    Add an event/annotation to the recording.

    Args:
        onset: Event onset time in seconds.
        duration: Event duration in seconds.
        description: Event description string.
    """
    new_event = pd.DataFrame(
        [{"onset": float(onset), "duration": float(duration), "description": description}]
    )
    # Avoid concatenating onto the empty, object-dtype events frame, which
    # would coerce the numeric columns to object. Start from the typed
    # new_event when there are no existing events.
    if self.events is None or self.events.empty:
        self.events = new_event
    else:
        self.events = pd.concat([self.events, new_event], ignore_index=True)
    # Sort events by onset time for consistency
    self.events = self.events.sort_values(by="onset").reset_index(drop=True)

from_file(filepath, importer=None, force_csv=False, bids_channels='auto', **kwargs) classmethod

The method to create a Recording object from file.

Args: filepath: Path to the input file importer: Name of the importer to use. Can be one of the following: - 'trigno': Delsys Trigno EMG system (CSV) - 'otb': OTB/OTB+ EMG system (OTB, OTB+) - 'eeglab': EEGLAB .set files (SET) - 'edf': EDF/EDF+/BDF/BDF+ format (EDF, BDF) - 'csv': Generic CSV (or TXT) files with columnar data - 'wfdb': Waveform Database (WFDB) - 'xdf': XDF format (multi-stream Lab Streaming Layer files) - 'meg': MEG via MNE (.fif and CTF .ds; requires the 'meg' extra) - 'brainvision': BrainVision .vhdr via MNE (requires the 'meg' extra) - 'tabular': biosigIO Parquet/Arrow/Feather (requires the 'arrow' extra) - 'neo': proprietary electrophysiology formats via python-neo (Intan, Blackrock, Spike2, Plexon, Micromed, Neuralynx, ...; requires the 'neo' extra) - 'zarr': biosigIO Zarr serving store (requires the 'zarr' extra) If None, the importer will be inferred from the file extension. Automatic import is supported for CSV/TXT files. force_csv: If True and importer is 'csv', forces using the generic CSV importer even if the file appears to match a specialized format. bids_channels: When 'auto' (default), look for a sibling BIDS _channels.tsv next to the file and apply its per-channel type/units over the importer's inferred values. Pass 'off' to disable. **kwargs: Additional arguments passed to the importer. For XDF files, useful kwargs include: - stream_names: List of stream names to import - stream_types: List of stream types to import (e.g., ["EMG", "EXG"]) - stream_ids: List of stream IDs to import

Returns: Recording: New Recording object with loaded data

Source code in biosigio/core/emg.py
@classmethod
def from_file(
    cls,
    filepath: str,
    importer: ImporterName | None = None,
    force_csv: bool = False,
    bids_channels: str = "auto",
    **kwargs,
) -> "Recording":
    """
    The method to create a Recording object from file.

    Args:
        filepath: Path to the input file
        importer: Name of the importer to use. Can be one of the following:
            - 'trigno': Delsys Trigno EMG system (CSV)
            - 'otb': OTB/OTB+ EMG system (OTB, OTB+)
            - 'eeglab': EEGLAB .set files (SET)
            - 'edf': EDF/EDF+/BDF/BDF+ format (EDF, BDF)
            - 'csv': Generic CSV (or TXT) files with columnar data
            - 'wfdb': Waveform Database (WFDB)
            - 'xdf': XDF format (multi-stream Lab Streaming Layer files)
            - 'meg': MEG via MNE (.fif and CTF .ds; requires the 'meg' extra)
            - 'brainvision': BrainVision .vhdr via MNE (requires the 'meg' extra)
            - 'tabular': biosigIO Parquet/Arrow/Feather (requires the 'arrow' extra)
            - 'neo': proprietary electrophysiology formats via python-neo
              (Intan, Blackrock, Spike2, Plexon, Micromed, Neuralynx, ...;
              requires the 'neo' extra)
            - 'zarr': biosigIO Zarr serving store (requires the 'zarr' extra)
            If None, the importer will be inferred from the file extension.
            Automatic import is supported for CSV/TXT files.
        force_csv: If True and importer is 'csv', forces using the generic CSV
                  importer even if the file appears to match a specialized format.
        bids_channels: When 'auto' (default), look for a sibling BIDS
                  _channels.tsv next to the file and apply its per-channel
                  type/units over the importer's inferred values. Pass 'off'
                  to disable.
        **kwargs: Additional arguments passed to the importer.
            For XDF files, useful kwargs include:
            - stream_names: List of stream names to import
            - stream_types: List of stream types to import (e.g., ["EMG", "EXG"])
            - stream_ids: List of stream IDs to import

    Returns:
        Recording: New Recording object with loaded data
    """
    if importer is None:
        importer = cls._infer_importer(filepath)

    importers = {
        "trigno": "TrignoImporter",  # CSV with Delsys Trigno Headers
        "otb": "OTBImporter",  # OTB/OTB+ EMG system data
        "edf": "EDFImporter",  # EDF/EDF+/BDF format
        "eeglab": "EEGLABImporter",  # EEGLAB .set files
        "csv": "CSVImporter",  # Generic CSV/Text files
        "wfdb": "WFDBImporter",  # Waveform Database format
        "xdf": "XDFImporter",  # XDF multi-stream format
        "meg": "MEGImporter",  # MEG via MNE (.fif, CTF .ds)
        "brainvision": "BrainVisionImporter",  # BrainVision via MNE (.vhdr)
        "tabular": "TabularImporter",  # biosigIO Parquet / Arrow / Feather
        "neo": "NeoImporter",  # proprietary ephys via python-neo
        "zarr": "ZarrImporter",  # biosigIO Zarr serving store
    }

    if importer not in importers:
        raise ValueError(
            f"Unsupported importer: {importer}. "
            f"Available importers: {list(importers.keys())}\n"
            "- trigno: Delsys Trigno EMG system\n"
            "- otb: OTB/OTB+ EMG system\n"
            "- edf: EDF/EDF+/BDF format\n"
            "- eeglab: EEGLAB .set files\n"
            "- csv: Generic CSV/Text files\n"
            "- wfdb: Waveform Database\n"
            "- xdf: XDF multi-stream format\n"
            "- meg: MEG via MNE (.fif, CTF .ds)\n"
            "- brainvision: BrainVision via MNE (.vhdr)\n"
            "- tabular: biosigIO Parquet/Arrow/Feather (.parquet, .feather, .arrow)\n"
            "- neo: proprietary electrophysiology formats via python-neo "
            "(Intan, Blackrock, Spike2, Plexon, Micromed, Neuralynx, ...)\n"
            "- zarr: biosigIO Zarr serving store (.zarr)"
        )

    # If using CSV importer and force_csv is set, pass it as force_generic
    if importer == "csv":
        kwargs["force_generic"] = force_csv

    # Import the appropriate importer class
    importer_module = __import__(
        f"biosigio.importers.{importer}", globals(), locals(), [importers[importer]]
    )
    importer_class = getattr(importer_module, importers[importer])

    # Create importer instance and load data
    rec = importer_class().load(filepath, **kwargs)

    # Record provenance: which format this recording came from. setdefault so a
    # re-imported serialization file (tabular/zarr) keeps the ORIGINAL
    # source_format restored from its metadata rather than being relabeled.
    rec.metadata.setdefault("source_format", importer)

    # In a BIDS layout, the sibling _channels.tsv is the authoritative source
    # of per-channel type/units; apply it over the importer's header/label
    # guesses unless explicitly disabled with bids_channels="off".
    if bids_channels != "off":
        from ..bids import apply_channels_tsv, find_channels_tsv

        channels_tsv = find_channels_tsv(filepath)
        if channels_tsv:
            apply_channels_tsv(rec, channels_tsv)

    return rec

get_channel_types()

Get list of unique channel types in the data.

Returns: List of channel types (e.g., ['EMG', 'ACC', 'GYRO'])

Source code in biosigio/core/emg.py
def get_channel_types(self) -> list[str]:
    """
    Get list of unique channel types in the data.

    Returns:
        List of channel types (e.g., ['EMG', 'ACC', 'GYRO'])
    """
    return list({info["channel_type"] for info in self.channels.values()})

get_channels_by_modality(modality)

Get the channels belonging to a given modality.

Args: modality: Modality to filter by ('EEG', 'EMG', 'IEEG', 'MEG', 'BEH', 'MISC').

Returns: List of channel names of the specified modality.

Source code in biosigio/core/emg.py
def get_channels_by_modality(self, modality: str) -> list[str]:
    """
    Get the channels belonging to a given modality.

    Args:
        modality: Modality to filter by ('EEG', 'EMG', 'IEEG', 'MEG', 'BEH', 'MISC').

    Returns:
        List of channel names of the specified modality.
    """
    canonical_modality = validate_modality(modality)
    return [
        ch for ch, info in self.channels.items() if info.get("modality") == canonical_modality
    ]

get_channels_by_type(channel_type)

Get list of channels of a specific type.

Args: channel_type: Type of channels to get ('EMG', 'ACC', 'GYRO', etc.)

Returns: List of channel names of the specified type

Source code in biosigio/core/emg.py
def get_channels_by_type(self, channel_type: str) -> list[str]:
    """
    Get list of channels of a specific type.

    Args:
        channel_type: Type of channels to get ('EMG', 'ACC', 'GYRO', etc.)

    Returns:
        List of channel names of the specified type
    """
    return [ch for ch, info in self.channels.items() if info["channel_type"] == channel_type]

get_duration()

Total recording duration in seconds (n_samples / sampling_frequency).

Computed from the time index spacing, so it is the full window length (one sample period longer than the last sample's timestamp). Returns 0.0 when fewer than two samples are loaded (a single sample has no inferable sample period).

Source code in biosigio/core/emg.py
def get_duration(self) -> float:
    """Total recording duration in seconds (n_samples / sampling_frequency).

    Computed from the time index spacing, so it is the full window length
    (one sample period longer than the last sample's timestamp). Returns 0.0
    when fewer than two samples are loaded (a single sample has no inferable
    sample period).
    """
    if self.signals is None or len(self.signals) < 2:
        return 0.0
    index = self.signals.index
    sample_period = float(index[1] - index[0])
    return len(index) * sample_period

get_metadata(key)

Get metadata value.

Args: key: Metadata key

Returns: Value associated with the key

Source code in biosigio/core/emg.py
def get_metadata(self, key: str) -> Any:
    """
    Get metadata value.

    Args:
        key: Metadata key

    Returns:
        Value associated with the key
    """
    return self.metadata.get(key)

get_modalities()

Get the list of unique modalities present in the data.

Returns: List of modalities (e.g., ['EEG', 'EMG', 'MISC']).

Source code in biosigio/core/emg.py
def get_modalities(self) -> list[str]:
    """
    Get the list of unique modalities present in the data.

    Returns:
        List of modalities (e.g., ['EEG', 'EMG', 'MISC']).
    """
    return list(
        {info.get("modality") for info in self.channels.values() if info.get("modality")}
    )

get_n_channels()

Number of channels in the recording.

Source code in biosigio/core/emg.py
def get_n_channels(self) -> int:
    """Number of channels in the recording."""
    return len(self.channels)

get_n_samples()

Number of time samples per channel (0 if no signals are loaded).

Source code in biosigio/core/emg.py
def get_n_samples(self) -> int:
    """Number of time samples per channel (0 if no signals are loaded)."""
    return 0 if self.signals is None else len(self.signals)

get_sampling_frequency()

Sampling frequency in Hz, when all channels share a single rate.

Raises: ValueError: if no channels are loaded, or channels have differing sampling frequencies; for a mixed-rate recording read channels[ch]["sample_frequency"] per channel instead.

Source code in biosigio/core/emg.py
def get_sampling_frequency(self) -> float:
    """Sampling frequency in Hz, when all channels share a single rate.

    Raises:
        ValueError: if no channels are loaded, or channels have differing
            sampling frequencies; for a mixed-rate recording read
            ``channels[ch]["sample_frequency"]`` per channel instead.
    """
    if not self.channels:
        raise ValueError("No channels loaded")
    rates = {info["sample_frequency"] for info in self.channels.values()}
    if len(rates) > 1:
        raise ValueError(
            "Channels have differing sampling frequencies; read "
            "channels[ch]['sample_frequency'] per channel instead."
        )
    return float(next(iter(rates)))

has_metadata(key)

Return True if key is present in the recording metadata.

Source code in biosigio/core/emg.py
def has_metadata(self, key: str) -> bool:
    """Return True if ``key`` is present in the recording metadata."""
    return key in self.metadata

plot_signals(channels=None, time_range=None, offset_scale=0.8, uniform_scale=True, detrend=False, grid=True, title=None, show=True, plt_module=None)

Plot signals in a single plot with vertical offsets.

Args: channels: List of channels to plot. If None, plot all channels. time_range: Tuple of (start_time, end_time) to plot. If None, plot all data. offset_scale: Portion of allocated space each signal can use (0.0 to 1.0). uniform_scale: Whether to use the same scale for all signals. detrend: Whether to remove mean from signals before plotting. grid: Whether to show grid lines. title: Optional title for the figure. show: Whether to display the plot. plt_module: Matplotlib pyplot module to use.

Source code in biosigio/core/emg.py
def plot_signals(
    self,
    channels=None,
    time_range=None,
    offset_scale=0.8,
    uniform_scale=True,
    detrend=False,
    grid=True,
    title=None,
    show=True,
    plt_module=None,
):
    """
    Plot signals in a single plot with vertical offsets.

    Args:
        channels: List of channels to plot. If None, plot all channels.
        time_range: Tuple of (start_time, end_time) to plot. If None, plot all data.
        offset_scale: Portion of allocated space each signal can use (0.0 to 1.0).
        uniform_scale: Whether to use the same scale for all signals.
        detrend: Whether to remove mean from signals before plotting.
        grid: Whether to show grid lines.
        title: Optional title for the figure.
        show: Whether to display the plot.
        plt_module: Matplotlib pyplot module to use.
    """
    # Delegate to the static plotting function in visualization module
    static_plot_signals(
        rec_object=self,
        channels=channels,
        time_range=time_range,
        offset_scale=offset_scale,
        uniform_scale=uniform_scale,
        detrend=detrend,
        grid=grid,
        title=title,
        show=show,
        plt_module=plt_module,
    )

resample(target_rate)

Return a NEW, anti-aliased down-sampled copy of this recording.

Low-resolution demos need a smaller, lighter recording; this rebuilds the uniform signal grid at target_rate using a polyphase resampler (scipy.signal.resample_poly), which applies a Kaiser-windowed sinc anti-alias FIR before decimation. A naive stride-decimation would fold energy above the new Nyquist back into the band (aliasing); resample_poly removes that energy first, so no aliasing occurs.

Non-destructive: self is left untouched and a new Recording is returned, mirroring select_channels's copy semantics.

Resampling factors come from the integer source/target rates: g = gcd(int(src), int(target)); up = int(target)//g; down = int(src)//g and resample_poly(x, up, down) runs once, vectorized over all channels along axis=0.

Args: target_rate: Desired sampling rate in Hz. Must be <= the source rate (this is a DOWN-sampling helper). A target equal to the source returns an unchanged copy; a target above it raises ValueError rather than silently up-sampling (up-sampling cannot recover detail and is out of scope for the low-res pipeline).

Returns: Recording: A new Recording with the resampled signals, each channel's sample_frequency set to the achieved rate (source * up / down, which equals target_rate for integer rates), and channel/recording metadata and events preserved. Events are unchanged because their onsets/durations are in SECONDS, which stay valid under any rate change (only the per-sample grid shrinks, not wall-clock time).

Raises: ValueError: If no signals are loaded, if channels do not share a single sample_frequency (biosigio stores one uniform grid; mixed-rate resampling is out of scope), or if target_rate exceeds the source rate.

Source code in biosigio/core/emg.py
def resample(self, target_rate: float) -> "Recording":
    """Return a NEW, anti-aliased down-sampled copy of this recording.

    Low-resolution demos need a smaller, lighter recording; this rebuilds the
    uniform signal grid at ``target_rate`` using a polyphase resampler
    (``scipy.signal.resample_poly``), which applies a Kaiser-windowed sinc
    anti-alias FIR before decimation. A naive stride-decimation would fold
    energy above the new Nyquist back into the band (aliasing); resample_poly
    removes that energy first, so no aliasing occurs.

    Non-destructive: ``self`` is left untouched and a new Recording is returned,
    mirroring ``select_channels``'s copy semantics.

    Resampling factors come from the integer source/target rates:
    ``g = gcd(int(src), int(target)); up = int(target)//g; down = int(src)//g``
    and ``resample_poly(x, up, down)`` runs once, vectorized over all channels
    along ``axis=0``.

    Args:
        target_rate: Desired sampling rate in Hz. Must be <= the source rate
            (this is a DOWN-sampling helper). A target equal to the source
            returns an unchanged copy; a target above it raises ``ValueError``
            rather than silently up-sampling (up-sampling cannot recover
            detail and is out of scope for the low-res pipeline).

    Returns:
        Recording: A new Recording with the resampled signals, each channel's
            ``sample_frequency`` set to the achieved rate (source * up / down,
            which equals ``target_rate`` for integer rates), and channel/recording
            metadata and events preserved. Events are unchanged because their
            onsets/durations are in SECONDS, which stay valid under any rate
            change (only the per-sample grid shrinks, not wall-clock time).

    Raises:
        ValueError: If no signals are loaded, if channels do not share a single
            ``sample_frequency`` (biosigio stores one uniform grid; mixed-rate
            resampling is out of scope), or if ``target_rate`` exceeds the
            source rate.
    """
    from math import gcd

    from scipy.signal import resample_poly

    if self.signals is None:
        raise ValueError("No signals loaded")

    if target_rate <= 0:
        raise ValueError(f"target_rate must be positive, got {target_rate}")

    # biosigio stores all channels on one uniform-length grid; a per-channel rate
    # mix is out of scope here, matching the exporter's single-rate guard.
    distinct_rates = {info["sample_frequency"] for info in self.channels.values()}
    if len(distinct_rates) > 1:
        raise ValueError(
            "Resampling requires a single sampling rate across all channels, but "
            f"multiple were found: {sorted(distinct_rates)} Hz. biosigio stores one "
            "uniform grid; resample each rate group separately."
        )

    source_rate = float(next(iter(distinct_rates)))

    # Down-sampling only: refuse to up-sample/alias; an equal rate is a no-op
    # copy so callers can resample unconditionally without special-casing.
    if target_rate > source_rate:
        raise ValueError(
            f"target_rate {target_rate} Hz exceeds source rate {source_rate} Hz; "
            "resample() only down-samples (low-res). Up-sampling is out of scope."
        )

    new_rec = Recording()
    new_rec.channels = {ch: info.copy() for ch, info in self.channels.items()}
    new_rec.metadata = self.metadata.copy()
    # Onsets/durations are in SECONDS, so they remain valid after the grid
    # changes; copy them through unchanged.
    new_rec.events = self.events.copy() if self.events is not None else self.events

    if target_rate == source_rate:
        # No grid change: copy signals through untouched (fresh RangeIndex for
        # consistency with the resampled path).
        new_rec.signals = self.signals.copy().reset_index(drop=True)
        return new_rec

    # Rational resampling factors from the integer rates.
    src_i = int(round(source_rate))
    tgt_i = int(round(target_rate))
    g = gcd(src_i, tgt_i)
    up = tgt_i // g
    down = src_i // g

    # The achieved rate is exactly source * up / down. Store THAT, not the
    # requested float, so the metadata can never disagree with the data (a
    # non-integer or odd target snaps to the nearest achievable rational rate;
    # warn so the caller knows). This avoids silently writing e.g. 99.5 Hz
    # onto a grid that resample_poly actually produced at 100 Hz.
    actual_rate = source_rate * up / down
    if abs(actual_rate - target_rate) > 1e-9:
        logging.warning(
            "Requested resample to %g Hz; nearest achievable rational rate is "
            "%g Hz, which is what is stored on the channels.",
            target_rate,
            actual_rate,
        )

    columns = list(self.signals.columns)
    data = self.signals.to_numpy(dtype=float)
    # resample_poly over axis=0 resamples every channel column at once with the
    # shared anti-alias FIR.
    resampled = resample_poly(data, up, down, axis=0)

    new_rec.signals = pd.DataFrame(resampled, columns=columns)
    new_rec.signals.index = pd.RangeIndex(len(new_rec.signals))
    for info in new_rec.channels.values():
        info["sample_frequency"] = actual_rate

    return new_rec

select_channels(channels=None, channel_type=None, inplace=False, *, modality=None)

Select specific channels from the data and return a new Recording object.

Args: channels: Channel name or list of channel names to select. If None and channel_type is specified, selects all channels of that type. channel_type: Type of channels to select ('EMG', 'ACC', 'GYRO', etc.). If specified with channels, filters the selection to only channels of this type.

Returns: Recording: A new Recording object containing only the selected channels

Examples: # Select specific channels new_rec = rec.select_channels(['EMG1', 'ACC1'])

# Select all EMG channels
emg_only = rec.select_channels(channel_type='EMG')

# Select specific EMG channels only, this example does not select ACC channels
emg_subset = rec.select_channels(['EMG1', 'ACC1'], channel_type='EMG')
Source code in biosigio/core/emg.py
def select_channels(
    self,
    channels: str | list[str] | None = None,
    channel_type: str | None = None,
    inplace: bool = False,
    *,
    modality: str | None = None,
) -> "Recording":
    """
    Select specific channels from the data and return a new Recording object.

    Args:
        channels: Channel name or list of channel names to select. If None and
                channel_type is specified, selects all channels of that type.
        channel_type: Type of channels to select ('EMG', 'ACC', 'GYRO', etc.).
                    If specified with channels, filters the selection to only
                    channels of this type.

    Returns:
        Recording: A new Recording object containing only the selected channels

    Examples:
        # Select specific channels
        new_rec = rec.select_channels(['EMG1', 'ACC1'])

        # Select all EMG channels
        emg_only = rec.select_channels(channel_type='EMG')

        # Select specific EMG channels only, this example does not select ACC channels
        emg_subset = rec.select_channels(['EMG1', 'ACC1'], channel_type='EMG')
    """
    if self.signals is None:
        raise ValueError("No signals loaded")

    if channels is None and channel_type is None and modality is None:
        raise ValueError("Specify at least one of: channels, channel_type, or modality.")

    # If type/modality specified but no channels, select all matching channels
    if channels is None and channel_type is not None:
        channels = self.get_channels_by_type(channel_type)
        if not channels:
            raise ValueError(f"No channels found of type: {channel_type}")
    elif channels is None and modality is not None:
        channels = self.get_channels_by_modality(modality)
        if not channels:
            raise ValueError(f"No channels found of modality: {modality}")
    elif isinstance(channels, str):
        channels = [channels]

    if channels is None:
        raise ValueError("Specify at least one of: channels, channel_type, or modality.")

    # Validate channels exist
    if not all(ch in self.signals.columns for ch in channels):
        missing = [ch for ch in channels if ch not in self.signals.columns]
        raise ValueError(f"Channels not found: {missing}")

    # Filter by type if specified
    if channel_type is not None:
        channels = [ch for ch in channels if self.channels[ch]["channel_type"] == channel_type]
        if not channels:
            raise ValueError(f"None of the selected channels are of type: {channel_type}")

    # Filter by modality if specified
    if modality is not None:
        canonical_modality = validate_modality(modality)
        channels = [
            ch for ch in channels if self.channels[ch].get("modality") == canonical_modality
        ]
        if not channels:
            raise ValueError(f"None of the selected channels are of modality: {modality}")

    # Create new Recording object
    new_rec = Recording()

    # Copy selected signals and channels
    new_rec.signals = self.signals[channels].copy()
    new_rec.channels = {ch: self.channels[ch].copy() for ch in channels}

    # Copy metadata
    new_rec.metadata = self.metadata.copy()

    if not inplace:
        return new_rec
    else:
        self.signals = new_rec.signals
        self.channels = new_rec.channels
        self.metadata = new_rec.metadata
        return self

set_channel(label, *, channel_type=None, modality=None, physical_dimension=None, prefilter=None)

Update metadata of an existing channel (the supported relabel path).

Args: label: Existing channel label. channel_type: New BIDS channel type (validated). When given without an explicit modality, the modality is re-derived from it. modality: New coarse modality (validated). physical_dimension: New physical unit. prefilter: New prefilter string.

Raises: KeyError: If label is not an existing channel. ValueError: If channel_type or modality is not in the modality vocabulary.

Source code in biosigio/core/emg.py
def set_channel(
    self,
    label: str,
    *,
    channel_type: str | None = None,
    modality: str | None = None,
    physical_dimension: str | None = None,
    prefilter: str | None = None,
) -> None:
    """
    Update metadata of an existing channel (the supported relabel path).

    Args:
        label: Existing channel label.
        channel_type: New BIDS channel type (validated). When given without an
            explicit ``modality``, the modality is re-derived from it.
        modality: New coarse modality (validated).
        physical_dimension: New physical unit.
        prefilter: New prefilter string.

    Raises:
        KeyError: If ``label`` is not an existing channel.
        ValueError: If ``channel_type`` or ``modality`` is not in the
            modality vocabulary.
    """
    if label not in self.channels:
        raise KeyError(f"Channel not found: {label}")
    info = self.channels[label]
    if channel_type is not None:
        info["channel_type"] = validate_channel_type(channel_type)
        if modality is None:
            info["modality"] = infer_modality_from_channel_type(info["channel_type"])
    if modality is not None:
        info["modality"] = validate_modality(modality)
    if physical_dimension is not None:
        info["physical_dimension"] = physical_dimension
    if prefilter is not None:
        info["prefilter"] = prefilter

set_metadata(key, value)

Set metadata value.

Args: key: Metadata key value: Metadata value

Source code in biosigio/core/emg.py
def set_metadata(self, key: str, value: Any) -> None:
    """
    Set metadata value.

    Args:
        key: Metadata key
        value: Metadata value
    """
    self.metadata[key] = value

to_arrow(filepath)

Export to a biosigIO Arrow/Feather file (fast zero-copy IPC).

Same self-describing schema as :meth:to_parquet; round-trips via Recording.from_file. Requires the arrow extra (pyarrow).

Args: filepath: Output .feather / .arrow path.

Returns: str: The written file path.

Source code in biosigio/core/emg.py
def to_arrow(self, filepath: str) -> str:
    """Export to a biosigIO Arrow/Feather file (fast zero-copy IPC).

    Same self-describing schema as :meth:`to_parquet`; round-trips via
    ``Recording.from_file``. Requires the ``arrow`` extra (pyarrow).

    Args:
        filepath: Output ``.feather`` / ``.arrow`` path.

    Returns:
        str: The written file path.
    """
    from ..exporters.tabular import TabularExporter

    return TabularExporter.to_arrow(self, filepath)

to_edf(filepath, method='both', fft_noise_range=None, svd_rank=None, precision_threshold=0.01, format='auto', bypass_analysis=None, verify=False, verify_tolerance=1e-06, verify_channel_map=None, verify_plot=False, events_df=None, create_channels_tsv=True, clip_outliers='auto', **kwargs)

Export the recording to EDF/BDF format, optionally including events.

Args: filepath: Path to save the EDF/BDF file method: Method for signal analysis ('svd', 'fft', or 'both') 'svd': Uses Singular Value Decomposition for noise floor estimation 'fft': Uses Fast Fourier Transform for noise floor estimation 'both': Uses both methods and takes the minimum noise floor (default) fft_noise_range: Optional tuple (min_freq, max_freq) specifying frequency range for noise in FFT method svd_rank: Optional manual rank cutoff for signal/noise separation in SVD method precision_threshold: Maximum acceptable precision loss percentage (default: 0.01%) format: Format to use ('auto', 'edf', or 'bdf'). Default is 'auto'. If 'edf' or 'bdf' is specified, that format will be used directly. If 'auto', the format (EDF/16-bit or BDF/24-bit) is chosen based on signal analysis to minimize precision loss while preferring EDF if sufficient. bypass_analysis: If True, skip signal analysis step when format is explicitly set to 'edf' or 'bdf'. If None (default), analysis is skipped automatically when format is forced. Set to False to force analysis even with a specified format. Ignored if format='auto'. verify: If True, reload the exported file and compare signals with the original to check for data integrity loss. Results are printed. (default: False) verify_tolerance: Absolute tolerance used when comparing signals during verification. (default: 1e-6) verify_channel_map: Optional dictionary mapping original channel names (keys) to reloaded channel names (values) for verification. Used if verify is True and channel names might differ. verify_plot: If True and verify is True, plots a comparison of original vs reloaded signals. events_df: Optional DataFrame with events ('onset', 'duration', 'description'). If None, uses self.events. (This provides flexibility) create_channels_tsv: If True, create a BIDS-compliant channels.tsv file (default: True) clip_outliers: Singularity handling for the per-channel physical window. 'auto' (default) keeps the full range losslessly but clips rare extreme outliers to a robust window only when keeping them would crater the bulk signal's resolution at the chosen format (with a warning); True always clips to the robust window; False never clips. See EDFExporter.export for the advanced outlier_sigmas / min_effective_bits knobs. **kwargs: Additional arguments for the EDF exporter

Returns: Union[str, None]: If verify is True, returns a string with verification results. Otherwise, returns None.

Raises: ValueError: If no signals are loaded

Source code in biosigio/core/emg.py
def to_edf(
    self,
    filepath: str,
    method: str = "both",
    fft_noise_range: tuple | None = None,
    svd_rank: int | None = None,
    precision_threshold: float = 0.01,
    format: Literal["auto", "edf", "bdf"] = "auto",
    bypass_analysis: bool | None = None,
    verify: bool = False,
    verify_tolerance: float = 1e-6,
    verify_channel_map: dict[str, str] | None = None,
    verify_plot: bool = False,
    events_df: pd.DataFrame | None = None,
    create_channels_tsv: bool = True,
    clip_outliers: bool | str = "auto",
    **kwargs,
) -> dict | None:
    """
    Export the recording to EDF/BDF format, optionally including events.

    Args:
        filepath: Path to save the EDF/BDF file
        method: Method for signal analysis ('svd', 'fft', or 'both')
            'svd': Uses Singular Value Decomposition for noise floor estimation
            'fft': Uses Fast Fourier Transform for noise floor estimation
            'both': Uses both methods and takes the minimum noise floor (default)
        fft_noise_range: Optional tuple (min_freq, max_freq) specifying frequency range for noise in FFT method
        svd_rank: Optional manual rank cutoff for signal/noise separation in SVD method
        precision_threshold: Maximum acceptable precision loss percentage (default: 0.01%)
        format: Format to use ('auto', 'edf', or 'bdf'). Default is 'auto'.
                If 'edf' or 'bdf' is specified, that format will be used directly.
                If 'auto', the format (EDF/16-bit or BDF/24-bit) is chosen based
                on signal analysis to minimize precision loss while preferring EDF
                if sufficient.
        bypass_analysis: If True, skip signal analysis step when format is explicitly
                         set to 'edf' or 'bdf'. If None (default), analysis is skipped
                         automatically when format is forced. Set to False to force
                         analysis even with a specified format. Ignored if format='auto'.
        verify: If True, reload the exported file and compare signals with the original
                to check for data integrity loss. Results are printed. (default: False)
        verify_tolerance: Absolute tolerance used when comparing signals during verification. (default: 1e-6)
        verify_channel_map: Optional dictionary mapping original channel names (keys)
                            to reloaded channel names (values) for verification.
                            Used if `verify` is True and channel names might differ.
        verify_plot: If True and verify is True, plots a comparison of original vs reloaded signals.
        events_df: Optional DataFrame with events ('onset', 'duration', 'description').
                  If None, uses self.events. (This provides flexibility)
        create_channels_tsv: If True, create a BIDS-compliant channels.tsv file (default: True)
        clip_outliers: Singularity handling for the per-channel physical window.
            'auto' (default) keeps the full range losslessly but clips rare extreme
            outliers to a robust window only when keeping them would crater the bulk
            signal's resolution at the chosen format (with a warning); True always
            clips to the robust window; False never clips. See EDFExporter.export for
            the advanced ``outlier_sigmas`` / ``min_effective_bits`` knobs.
        **kwargs: Additional arguments for the EDF exporter

    Returns:
        Union[str, None]: If verify is True, returns a string with verification results.
                         Otherwise, returns None.

    Raises:
        ValueError: If no signals are loaded
    """
    from ..exporters.edf import EDFExporter  # Local import

    if self.signals is None:
        raise ValueError("No signals loaded")

    # --- Determine if analysis should be bypassed ---
    final_bypass_analysis = False
    if format.lower() == "auto":
        if bypass_analysis is True:
            logging.warning(
                "bypass_analysis=True ignored because format='auto'. Analysis is required."
            )
        # Analysis is always needed for 'auto' format
        final_bypass_analysis = False
    elif format.lower() in ["edf", "bdf"]:
        if bypass_analysis is None:
            # Default behaviour: skip analysis if format is forced
            final_bypass_analysis = True
            msg = (
                f"Format forced to '{format}'. Skipping signal analysis for faster export. "
                "Set bypass_analysis=False to force analysis."
            )
            logging.log(logging.CRITICAL, msg)
        elif bypass_analysis is True:
            final_bypass_analysis = True
            logging.log(logging.CRITICAL, "bypass_analysis=True set. Skipping signal analysis.")
        else:  # bypass_analysis is False
            final_bypass_analysis = False
            logging.info(
                f"Format forced to '{format}' but bypass_analysis=False. Performing signal analysis."
            )
    else:
        # Should not happen if Literal type hint works, but good practice
        logging.warning(
            f"Unknown format '{format}'. Defaulting to 'auto' behavior (analysis enabled)."
        )
        format = "auto"
        final_bypass_analysis = False

    # Determine which events DataFrame to use
    if events_df is None:
        events_to_export = self.events
    else:
        events_to_export = events_df

    # Combine parameters
    all_params: dict[str, Any] = {
        "precision_threshold": precision_threshold,
        "method": method,
        "fft_noise_range": fft_noise_range,
        "svd_rank": svd_rank,
        "format": format,
        "bypass_analysis": final_bypass_analysis,
        "events_df": events_to_export,  # Pass the events dataframe
        "create_channels_tsv": create_channels_tsv,
        "clip_outliers": clip_outliers,
        **kwargs,
    }

    EDFExporter.export(self, filepath, **all_params)

    verification_report_dict = None
    if verify:
        logging.info(f"Verification requested. Reloading exported file: {filepath}")
        try:
            # Reload the exported file
            reloaded_rec = Recording.from_file(filepath, importer="edf")

            logging.info("Comparing original signals with reloaded signals...")
            # Compare signals using the imported function
            verification_results = compare_signals(
                self, reloaded_rec, tolerance=verify_tolerance, channel_map=verify_channel_map
            )

            # Generate and log report using the imported function
            report_verification_results(verification_results, verify_tolerance)
            verification_report_dict = verification_results

            # Plot comparison using imported function if requested
            summary = verification_results.get("channel_summary", {})
            comparison_mode = summary.get("comparison_mode", "unknown")
            compared_count = sum(1 for k in verification_results if k != "channel_summary")

            if verify_plot and compared_count > 0 and comparison_mode != "failed":
                plot_comparison(self, reloaded_rec, channel_map=verify_channel_map)
            elif verify_plot:
                logging.warning(
                    "Skipping verification plot: No channels were successfully compared."
                )

        except Exception as e:
            logging.error(f"Verification failed during reload or comparison: {e}")
            verification_report_dict = {
                "error": str(e),
                "channel_summary": {"comparison_mode": "failed"},
            }

    return verification_report_dict

to_parquet(filepath)

Export to a self-describing biosigIO Parquet file.

Signals are stored as a columnar table (channels = columns, time index preserved); channels/events/metadata travel in the file's schema metadata, so Recording.from_file round-trips it losslessly. Great for analytics (DuckDB/Polars/pandas/Spark). Requires the arrow extra (pyarrow).

Args: filepath: Output .parquet path.

Returns: str: The written file path.

Source code in biosigio/core/emg.py
def to_parquet(self, filepath: str) -> str:
    """Export to a self-describing biosigIO Parquet file.

    Signals are stored as a columnar table (channels = columns, time index
    preserved); channels/events/metadata travel in the file's schema metadata,
    so ``Recording.from_file`` round-trips it losslessly. Great for analytics
    (DuckDB/Polars/pandas/Spark). Requires the ``arrow`` extra (pyarrow).

    Args:
        filepath: Output ``.parquet`` path.

    Returns:
        str: The written file path.
    """
    from ..exporters.tabular import TabularExporter

    return TabularExporter.to_parquet(self, filepath)

to_zarr(filepath, **kwargs)

Export to a sharded Zarr v3 serving store with a min/max view pyramid.

Writes one cloud-native store that serves viewing, inference, and training from a single conversion: level 0 of each (modality, rate) group is the anti-aliased, per-modality-resampled inference signal, with a min/max render pyramid above it (flagged not-for-inference). A derived serving copy, not the archival source (BIDS/EDF stay authoritative). Requires the zarr extra (zarr v3). See :class:~biosigio.exporters.zarr.ZarrExporter for the tuning knobs (modality_rates, dtype, chunk/shard sizing, ...).

Args: filepath: Output store path (.zarr appended if missing). **kwargs: Forwarded to :meth:ZarrExporter.export.

Returns: str: The written store path.

Source code in biosigio/core/emg.py
def to_zarr(self, filepath: str, **kwargs) -> str:
    """Export to a sharded Zarr v3 serving store with a min/max view pyramid.

    Writes one cloud-native store that serves viewing, inference, and training
    from a single conversion: ``level 0`` of each ``(modality, rate)`` group is
    the anti-aliased, per-modality-resampled inference signal, with a min/max
    render pyramid above it (flagged not-for-inference). A derived serving copy,
    not the archival source (BIDS/EDF stay authoritative). Requires the ``zarr``
    extra (zarr v3). See :class:`~biosigio.exporters.zarr.ZarrExporter` for the
    tuning knobs (``modality_rates``, ``dtype``, chunk/shard sizing, ...).

    Args:
        filepath: Output store path (``.zarr`` appended if missing).
        **kwargs: Forwarded to :meth:`ZarrExporter.export`.

    Returns:
        str: The written store path.
    """
    from ..exporters.zarr import ZarrExporter

    # The empty-signal guard lives once, in ZarrExporter.export ("No signals
    # loaded"), matching the tabular path; no duplicate guard here.
    return ZarrExporter.export(self, filepath, **kwargs)

_determine_scaling_factors(signal_min, signal_max, use_bdf=False)

Compute EDF/BDF header scaling for a physical window [signal_min, signal_max].

Returns (physical_min, physical_max, digital_min, digital_max, scaling_factor). physical_min/physical_max are rounded OUTWARD to the 8-char header field so the stored window always brackets the input window; this is what stops pyedflib from saturating (clipping) the signal, which was the source of the per-channel corruption in issue #61. The caller chooses the window (full range by default, or a robust window when clipping genuine outliers via :func:_resolve_physical_window). scaling_factor is informational only: pyedflib derives its own digitization from the physical/digital ranges and ignores this value.

Args: signal_min: Minimum physical value the window must contain. signal_max: Maximum physical value the window must contain. use_bdf: Whether to use BDF (24-bit) digital range.

Returns: tuple: (physical_min, physical_max, digital_min, digital_max, scaling_factor)

Source code in biosigio/exporters/edf.py
def _determine_scaling_factors(
    signal_min: float, signal_max: float, use_bdf: bool = False
) -> tuple:
    """Compute EDF/BDF header scaling for a physical window [signal_min, signal_max].

    Returns ``(physical_min, physical_max, digital_min, digital_max,
    scaling_factor)``. physical_min/physical_max are rounded OUTWARD to the
    8-char header field so the stored window always brackets the input window;
    this is what stops pyedflib from saturating (clipping) the signal, which was
    the source of the per-channel corruption in issue #61. The caller chooses the
    window (full range by default, or a robust window when clipping genuine
    outliers via :func:`_resolve_physical_window`). ``scaling_factor`` is
    informational only: pyedflib derives its own digitization from the
    physical/digital ranges and ignores this value.

    Args:
        signal_min: Minimum physical value the window must contain.
        signal_max: Maximum physical value the window must contain.
        use_bdf: Whether to use BDF (24-bit) digital range.

    Returns:
        tuple: (physical_min, physical_max, digital_min, digital_max, scaling_factor)
    """
    if np.isnan(signal_min):
        signal_min = -1e-6
    if np.isnan(signal_max):
        signal_max = 1e-6
    if signal_min > signal_max:
        signal_min, signal_max = signal_max, signal_min

    if use_bdf:
        digital_min, digital_max = -8388608, 8388607  # 24-bit
    else:
        digital_min, digital_max = -32768, 32767  # 16-bit
    digital_range = digital_max - digital_min

    if np.isclose(signal_min, signal_max):
        if np.isclose(signal_min, 0.0):
            # Zero signal: minimal symmetric range around zero.
            phys_min, phys_max = -1e-6, 1e-6
        else:
            # Constant non-zero signal: a 1% margin on each side keeps a valid,
            # signal-scaled range that brackets the value.
            margin = abs(signal_min) * 0.01
            phys_min = _fit_physical_bound(signal_min - margin, round_up=False)
            phys_max = _fit_physical_bound(signal_max + margin, round_up=True)
    else:
        phys_min = _fit_physical_bound(signal_min, round_up=False)
        phys_max = _fit_physical_bound(signal_max, round_up=True)

    # Guarantee a strictly positive physical range (pyedflib requires
    # physical_min != physical_max), widening outward if rounding collapsed it.
    if not phys_min < phys_max:
        bump = max(abs(phys_min), abs(phys_max), 1e-6) * 1e-3
        phys_max = _fit_physical_bound(phys_min + bump, round_up=True)
        if not phys_min < phys_max:
            phys_max = phys_min + bump

    scaling_factor = (digital_range - 1) / (phys_max - phys_min)
    return phys_min, phys_max, digital_min, digital_max, scaling_factor

_fit_physical_bound(value, *, round_up, max_chars=_PHYS_FIELD_CHARS)

Round a physical bound OUTWARD to fit the EDF/BDF 8-char header field.

pyedflib stores physical_min/physical_max as short ASCII and truncates any value whose str() exceeds 8 characters, while any sample outside [physical_min, physical_max] is saturated (clipped) on write. To guarantee the stored window always brackets the signal we round AWAY from the data: round_up=True returns the smallest representable value >= value (use for physical_max); round_up=False returns the largest <= value (use for physical_min). The result's str() fits max_chars whenever the magnitude is representable; an integer part with more than max_chars digits (|value| >= 1e8, or >= 1e7 once a sign is added) cannot fit, so the tightest outward value is returned and the exporter rejects it with a clear unit-rescale error rather than letting pyedflib truncate it. Containment (the bracket) always holds. Because the digital range is mapped onto [physical_min, physical_max], outward rounding is an affine rescale of the reconstruction and barely affects correlation; clipping (which this prevents) is what destroys signal fidelity.

Source code in biosigio/exporters/edf.py
def _fit_physical_bound(value: float, *, round_up: bool, max_chars: int = _PHYS_FIELD_CHARS):
    """Round a physical bound OUTWARD to fit the EDF/BDF 8-char header field.

    pyedflib stores physical_min/physical_max as short ASCII and truncates any
    value whose ``str()`` exceeds 8 characters, while any sample outside
    [physical_min, physical_max] is saturated (clipped) on write. To guarantee
    the stored window always brackets the signal we round AWAY from the data:
    ``round_up=True`` returns the smallest representable value ``>= value`` (use
    for physical_max); ``round_up=False`` returns the largest ``<= value`` (use
    for physical_min). The result's ``str()`` fits ``max_chars`` whenever the
    magnitude is representable; an integer part with more than ``max_chars`` digits
    (|value| >= 1e8, or >= 1e7 once a sign is added) cannot fit, so the tightest
    outward value is returned and the exporter rejects it with a clear unit-rescale
    error rather than letting pyedflib truncate it. Containment (the bracket) always
    holds. Because the digital range is mapped onto [physical_min, physical_max],
    outward rounding is an affine rescale of the reconstruction and barely affects
    correlation; clipping (which this prevents) is what destroys signal fidelity.
    """
    if value == 0 or not math.isfinite(value):
        return 0.0
    rounding = ROUND_CEILING if round_up else ROUND_FLOOR
    dec = Decimal(repr(float(value)))
    exponent = dec.adjusted()  # power of ten of the most-significant digit
    # Prefer the most significant figures that still fit (tightest bracket),
    # stepping down to coarser rounding until the str() fits the field.
    for sig in range(max_chars, 0, -1):
        quantum = Decimal(1).scaleb(exponent - sig + 1)
        try:
            rounded = dec.quantize(quantum, rounding=rounding)
        except InvalidOperation:
            continue
        out = int(rounded) if rounded == rounded.to_integral_value() else float(rounded)
        # float() can land one ULP on the wrong side of the decimal; if so, take
        # one more whole quantum outward so the bracket invariant always holds.
        if round_up and out < value:
            out = float(rounded + quantum)
        elif not round_up and out > value:
            out = float(rounded - quantum)
        if len(str(out)) <= max_chars:
            return out
    # Extreme magnitude (more integer digits than the field holds): fall back to
    # a single significant figure, still rounded outward.
    quantum = Decimal(1).scaleb(exponent)
    rounded = dec.quantize(quantum, rounding=rounding)
    return int(rounded) if rounded == rounded.to_integral_value() else float(rounded)

_resolve_physical_window(signal, use_bdf, clip_outliers, outlier_sigmas, min_effective_bits)

Decide the physical window [lo, hi] the header bounds will bracket.

EDF/BDF map the whole digital range onto [physical_min, physical_max], so a single extreme outlier inflates the range and starves the bulk signal of quantization levels. The robust window is the min/max of the inliers - samples within outlier_sigmas robust standard deviations (1.4826 x median-absolute-deviation) of the median - so only genuine singularities fall outside it, not a fixed fraction of legitimate samples. For a (near-)constant bulk the MAD is zero, so a tight percentile band is used instead, which still isolates sparse spikes on a flat channel.

  • clip_outliers=False: always the full data range (purely lossless).
  • clip_outliers="auto" (default): the full range, UNLESS keeping it would push the bulk below min_effective_bits of resolution at the chosen format, in which case the singular outliers are clipped to the robust window so the recording survives.
  • clip_outliers=True: clip to the robust window whenever any sample lies outside it (a no-op when there are no outliers).

Returns (lo, hi, n_clipped, max_excursion); n_clipped counts the samples that will saturate and max_excursion is how far the worst one lies beyond the window (both only for the caller's warning/report).

Source code in biosigio/exporters/edf.py
def _resolve_physical_window(
    signal: np.ndarray,
    use_bdf: bool,
    clip_outliers,
    outlier_sigmas: float,
    min_effective_bits: float,
) -> tuple:
    """Decide the physical window [lo, hi] the header bounds will bracket.

    EDF/BDF map the whole digital range onto [physical_min, physical_max], so a
    single extreme outlier inflates the range and starves the bulk signal of
    quantization levels. The robust window is the min/max of the *inliers* -
    samples within ``outlier_sigmas`` robust standard deviations
    (1.4826 x median-absolute-deviation) of the median - so only genuine
    singularities fall outside it, not a fixed fraction of legitimate samples.
    For a (near-)constant bulk the MAD is zero, so a tight percentile band is
    used instead, which still isolates sparse spikes on a flat channel.

    - ``clip_outliers=False``: always the full data range (purely lossless).
    - ``clip_outliers="auto"`` (default): the full range, UNLESS keeping it would
      push the bulk below ``min_effective_bits`` of resolution at the chosen
      format, in which case the singular outliers are clipped to the robust
      window so the recording survives.
    - ``clip_outliers=True``: clip to the robust window whenever any sample lies
      outside it (a no-op when there are no outliers).

    Returns ``(lo, hi, n_clipped, max_excursion)``; ``n_clipped`` counts the
    samples that will saturate and ``max_excursion`` is how far the worst one
    lies beyond the window (both only for the caller's warning/report).
    """
    finite = signal[np.isfinite(signal)]
    if finite.size == 0:
        return 0.0, 0.0, 0, 0.0
    smin = float(np.min(finite))
    smax = float(np.max(finite))
    full_range = smax - smin
    if clip_outliers is False or full_range <= 0.0:
        return smin, smax, 0, 0.0

    median = float(np.median(finite))
    mad = float(np.median(np.abs(finite - median)))
    if mad > 0.0:
        threshold = outlier_sigmas * 1.4826 * mad
        inliers = finite[np.abs(finite - median) <= threshold]
        if inliers.size == 0:
            inliers = finite
        lo, hi = float(np.min(inliers)), float(np.max(inliers))
    else:
        lo, hi = (float(v) for v in np.percentile(finite, _CONSTANT_BULK_PERCENTILES))
    core_range = hi - lo
    if core_range <= 0.0:
        return smin, smax, 0, 0.0

    bits = 24 if use_bdf else 16
    # Effective bits the bulk would retain if the full range were kept: every
    # doubling of full/core range costs one bit of bulk resolution.
    eff_bits_full = bits - math.log2(full_range / core_range) if full_range > core_range else bits
    if clip_outliers == "auto" and eff_bits_full >= min_effective_bits:
        return smin, smax, 0, 0.0  # the format absorbs the range; stay lossless

    n_clipped = int(np.count_nonzero((finite < lo) | (finite > hi)))
    if n_clipped == 0:
        return smin, smax, 0, 0.0  # nothing actually outside the window
    max_excursion = max(smax - hi, lo - smin, 0.0)
    return lo, hi, n_clipped, float(max_excursion)

analyze_signal(signal, method='svd', fft_noise_range=None, svd_rank=None)

Analyze signal characteristics including noise floor and dynamic range.

Args: signal: Input signal array method: Method for noise floor estimation: 'svd' (default), 'fft', or 'both' fft_noise_range: Optional tuple (min_freq, max_freq) for FFT method svd_rank: Optional rank cutoff for SVD method

Returns: dict: Analysis results including range, noise floor, and dynamic range in dB

Source code in biosigio/analysis/signal.py
def analyze_signal(
    signal: np.ndarray,
    method: str = "svd",
    fft_noise_range: tuple | None = None,
    svd_rank: int | None = None,
) -> dict:
    """
    Analyze signal characteristics including noise floor and dynamic range.

    Args:
        signal: Input signal array
        method: Method for noise floor estimation: 'svd' (default), 'fft', or 'both'
        fft_noise_range: Optional tuple (min_freq, max_freq) for FFT method
        svd_rank: Optional rank cutoff for SVD method

    Returns:
        dict: Analysis results including range, noise floor, and dynamic range in dB
    """
    # Handle zero signal case
    if np.allclose(signal, 0):
        return {
            "range": 0.0,
            "noise_floor": np.finfo(float).eps,
            "dynamic_range_db": 0.0,
            "is_zero": True,
        }

    # Remove DC offset for better analysis
    detrended = signal - np.mean(signal)

    # Calculate signal range (peak-to-peak)
    signal_range = np.max(detrended) - np.min(detrended)

    # Use both methods and take the minimum noise floor for better accuracy
    # This helps preserve high dynamic range signals
    if method.lower() == "both":
        # Try SVD first, fall back to FFT if it fails
        try:
            noise_floor_svd = analyze_signal_svd(detrended, svd_rank)
            try:
                noise_floor_fft = analyze_signal_fft(detrended, fft_noise_range)
                noise_floor = min(noise_floor_svd, noise_floor_fft)
                method = "both (min)"
            except Exception:
                # If FFT fails but SVD worked, use SVD result
                noise_floor = noise_floor_svd
                method = "svd (fallback)"
        except Exception:
            # If SVD fails, try FFT
            try:
                noise_floor = analyze_signal_fft(detrended, fft_noise_range)
                method = "fft (fallback)"
            except Exception:
                # If both methods fail, use a simple statistical approach
                noise_floor = np.std(np.diff(detrended)) / np.sqrt(2)
                method = "statistical (fallback)"
    else:
        # Choose noise floor estimation method
        try:
            if method.lower() == "svd":
                noise_floor = analyze_signal_svd(detrended, svd_rank)
            elif method.lower() == "fft":
                noise_floor = analyze_signal_fft(detrended, fft_noise_range)
            else:
                raise ValueError(f"Unknown method: {method}. Use 'svd', 'fft', or 'both'.")
        except Exception:
            # Fallback to simple statistical approach if the chosen method fails
            noise_floor = np.std(np.diff(detrended)) / np.sqrt(2)
            method = f"{method} failed, using statistical (fallback)"

    # Ensure minimum noise floor
    noise_floor = max(noise_floor, np.finfo(float).eps)

    # Calculate dynamic range in dB
    dynamic_range_db = 20 * np.log10(signal_range / noise_floor)

    # Cap dynamic range at realistic values based on format capabilities
    # For high dynamic range test, we need to preserve at least 90dB
    # 16-bit ADC theoretical max is ~96dB, 24-bit is ~144dB
    # In practice, most signals don't exceed these values
    max_realistic_dr = 90  # Default for EDF format (16-bit)

    # For high dynamic range signals, allow up to 140dB (for BDF format)
    if dynamic_range_db > 90:
        max_realistic_dr = 140  # Maximum for BDF format (24-bit)

    if dynamic_range_db > max_realistic_dr:
        # Adjust noise floor to match the capped dynamic range
        noise_floor = signal_range / (10 ** (max_realistic_dr / 20))
        dynamic_range_db = max_realistic_dr

    # Calculate signal SNR
    signal_std = np.std(signal)
    snr_db = 20 * np.log10(signal_std / noise_floor)

    # Cap SNR at realistic values
    max_realistic_snr = 140  # Increased maximum realistic SNR in dB
    if snr_db > max_realistic_snr:
        snr_db = max_realistic_snr

    return {
        "range": signal_range,
        "noise_floor": noise_floor,
        "dynamic_range_db": dynamic_range_db,
        "snr_db": snr_db,
        "is_zero": False,
        "method": method,
    }

determine_format_suitability(signal, analysis)

Determine whether EDF or BDF format is suitable for the signal.

Args: signal: Input signal array analysis: Signal analysis results from analyze_signal()

Returns: tuple: (use_bdf, reason, snr_db)

Source code in biosigio/analysis/signal.py
def determine_format_suitability(signal: np.ndarray, analysis: dict) -> tuple:
    """
    Determine whether EDF or BDF format is suitable for the signal.

    Args:
        signal: Input signal array
        analysis: Signal analysis results from analyze_signal()

    Returns:
        tuple: (use_bdf, reason, snr_db)
    """
    # Handle zero signal case
    if analysis.get("is_zero", False):
        return False, "Zero signal, using EDF format", 0.0

    # Theoretical format capabilities
    edf_dynamic_range = 90  # dB (16-bit) - slightly reduced from theoretical 96dB for safety
    bdf_dynamic_range = 140  # dB (24-bit) - slightly reduced from theoretical 144dB for safety
    safety_margin = 3  # dB - reduced to better preserve high dynamic range signals

    # Get signal characteristics
    signal_dr = analysis["dynamic_range_db"]
    signal_snr = analysis.get("snr_db", 0)
    # signal_range = analysis['range']  # Not used for format selection

    # # Check amplitude first - if signal range is very large, use BDF
    # if signal_range > 1e5:  # Reduced threshold to catch more high-amplitude signals
    #     return True, f"Large amplitude signal ({signal_range:.1f}), using BDF", signal_snr

    # Then check dynamic range with safety margin
    if signal_dr <= (edf_dynamic_range - safety_margin):
        return False, f"EDF dynamic range ({edf_dynamic_range} dB) is sufficient", signal_snr
    elif signal_dr <= (bdf_dynamic_range - safety_margin):
        return True, f"Signal requires BDF format (DR: {signal_dr:.1f} dB)", signal_snr
    else:
        return (
            True,
            f"Signal may require higher resolution than BDF (DR: {signal_dr:.1f} dB)",
            signal_snr,
        )

summarize_channels(channels, analyses)

Generate a summary of channel characteristics grouped by type.

Args: channels: Dictionary of channel information analyses: Dictionary of signal analyses

Returns: str: Formatted summary string

Source code in biosigio/exporters/edf.py
def summarize_channels(channels: dict, analyses: dict) -> str:
    """
    Generate a summary of channel characteristics grouped by type.

    Args:
        channels: Dictionary of channel information
        analyses: Dictionary of signal analyses

    Returns:
        str: Formatted summary string
    """
    # Group channels by type
    type_groups = {}
    for ch_name, ch_info in channels.items():
        ch_type = ch_info.get("channel_type", "Unknown")
        if ch_type not in type_groups:
            type_groups[ch_type] = {
                "channels": [],
                "ranges": [],
                "dynamic_ranges": [],
                "snrs": [],
                "formats": [],
                "unit": ch_info.get("physical_dimension", "Unknown"),
            }
        type_groups[ch_type]["channels"].append(ch_name)

        analysis = analyses.get(ch_name, {})
        if not analysis.get("is_zero", False):
            type_groups[ch_type]["ranges"].append(analysis.get("range", 0))
            type_groups[ch_type]["dynamic_ranges"].append(analysis.get("dynamic_range_db", 0))
            type_groups[ch_type]["snrs"].append(analysis.get("snr_db", 0))
            type_groups[ch_type]["formats"].append(
                "BDF" if analysis.get("use_bdf", False) else "EDF"
            )

    # Generate summary
    summary = []
    for ch_type, data in type_groups.items():
        ranges = np.array(data["ranges"])
        dynamic_ranges = np.array(data["dynamic_ranges"])
        snrs = np.array(data["snrs"])
        formats = data["formats"]

        if len(ranges) > 0:
            summary.append(f"\nChannel Type: {ch_type} ({len(data['channels'])} channels)")
            summary.append(
                f"Range: {np.min(ranges):.2f} to {np.max(ranges):.2f} "
                f"(mean: {np.mean(ranges):.2f}) {data['unit']}"
            )
            summary.append(
                f"Dynamic Range: {np.min(dynamic_ranges):.1f} to "
                f"{np.max(dynamic_ranges):.1f} (mean: {np.mean(dynamic_ranges):.1f}) dB"
            )
            summary.append(
                f"SNR: {np.min(snrs):.1f} to {np.max(snrs):.1f} (mean: {np.mean(snrs):.1f}) dB"
            )

            edf_count = formats.count("EDF")
            bdf_count = formats.count("BDF")
            summary.append(
                f"Format: {edf_count} channels using EDF, {bdf_count} channels using BDF"
            )
        else:
            summary.append(f"\nChannel Type: {ch_type} ({len(data['channels'])} channels)")
            summary.append("All channels contain zero signal")

    return "\n".join(summary)

to_bids_channels_tsv_type(channel_type)

Map an biosigio channel type to a BIDS channels.tsv type value.

Genuine BIDS electrophysiology/physiology types pass through; device-domain types (ACC, GYRO, QUAT, CTRL, MAGN) and OTHER map to MISC.

Source code in biosigio/core/modality.py
def to_bids_channels_tsv_type(channel_type: str) -> str:
    """Map an biosigio channel type to a BIDS ``channels.tsv`` ``type`` value.

    Genuine BIDS electrophysiology/physiology types pass through; device-domain
    types (ACC, GYRO, QUAT, CTRL, MAGN) and ``OTHER`` map to ``MISC``.
    """
    ct = validate_channel_type(channel_type)
    return ct if ct in _BIDS_CHANNELS_TSV_TYPES else "MISC"

Usage Example

from biosigio import Recording

# Load data
rec = Recording.from_file('data.csv', importer='trigno')

# Export to EDF/BDF with automatic format selection
rec.to_edf('output')  # Will generate output.edf or output.bdf

# Force specific format
rec.to_edf('output_edf', format='edf')  # Forces 16-bit EDF
rec.to_edf('output_bdf', format='bdf')  # Forces 24-bit BDF

Automatic Format Selection

A key feature of biosigIO's exporter is its ability to automatically determine whether to use EDF (16-bit) or BDF (24-bit) format based on the dynamic range of the data:

# Control the analysis method for format selection
rec.to_edf('output', method='svd')  # Use SVD analysis only
rec.to_edf('output', method='fft')  # Use FFT analysis only 
rec.to_edf('output', method='both')  # Use both methods (default)

# Customize SVD parameters
rec.to_edf('output', method='svd', svd_rank=5)  # Manual rank cutoff

# Customize FFT parameters
rec.to_edf('output', 
           method='fft', 
           fft_noise_range=(0.1, 10))  # Manual frequency range for noise floor estimation

Parameters

The to_edf method accepts the following parameters:

  • filepath (str): Path for the output file. The extension is set automatically to .edf or .bdf based on the chosen format.
  • format (str, optional): Specify the format to use ('auto', 'edf', or 'bdf'). Default is 'auto'.
  • method (str, optional): Method for format selection ('svd', 'fft', or 'both'). Default is 'both'.
  • svd_rank (int, optional): Rank cutoff for SVD analysis. Default is None (automatic).
  • fft_noise_range (tuple, optional): Frequency range (min, max) for noise floor estimation in FFT. Default is None (automatic).
  • precision_threshold (float, optional): Maximum acceptable precision loss percentage. Default is 0.01.
  • bypass_analysis (bool, optional): Skip signal analysis when format is forced to 'edf' or 'bdf'. Default is None (skip when format is forced).
  • verify (bool, optional): Reload the exported file and compare it with the original. Default is False.
  • verify_tolerance (float, optional): Absolute tolerance used during verification. Default is 1e-6.
  • verify_channel_map (dict, optional): Map original channel names to reloaded names for verification.
  • verify_plot (bool, optional): Plot original vs reloaded signals when verifying. Default is False.
  • events_df (DataFrame, optional): Events to write as EDF+ annotations. Defaults to self.events.
  • create_channels_tsv (bool, optional): Write a BIDS-compliant channels.tsv sidecar. Default is True.
  • clip_outliers (bool or str, optional): Per-channel physical-window outlier handling ('auto', True, or False). Default is 'auto'.

Understanding Format Selection

The exporter uses two complementary approaches to determine the appropriate format:

1. SVD Analysis

Singular Value Decomposition (SVD) is used to: - Estimate the effective dimensionality of the data - Analyze the distribution of signal energy across components - Determine if the precision requirements can be satisfied by 16-bit representation

2. FFT Analysis

Fast Fourier Transform (FFT) analysis: - Examines the frequency domain representation of the data - Evaluates the noise floor and signal-to-noise ratio - Helps determine if 16-bit precision is sufficient or if 24-bit is needed

Output Files

When exporting, biosigIO generates the following files:

  1. Main data file: Either .edf or .bdf extension depending on the format selected
  2. Channels metadata file: A {output}_channels.tsv sidecar (BIDS underscore naming) with detailed channel information in BIDS-compatible format, written next to the data file unless create_channels_tsv=False.

Example channels.tsv file content:

name    type    units   sampling_frequency  reference   status
EMG1    EMG     µV      2000                n/a         good
EMG2    EMG     µV      2000                n/a         good
ACC1    ACC     g       2000                n/a         good

Additional Features

  • Channel scaling: Signals are automatically scaled to maximize precision
  • Metadata preservation: Subject, recording, and other metadata are included in the EDF header
  • BIDS compatibility: The exporter follows BIDS conventions for metadata
  • Multi-channel support: Handles multiple channel types with appropriate units
  • Single sampling rate: EDF/BDF export requires one sampling rate shared by all channels. If the channels have differing sampling rates, the exporter raises a ValueError; resample the channels to a common rate (for example with Recording.resample()) before exporting.