Skip to content

XDF Importer

The XDFImporter class handles importing data from XDF (Extensible Data Format) files, the native format for Lab Streaming Layer (LSL) recordings. It supports multi-stream files with different sampling rates and data types.

Class Documentation

emgio.importers.xdf

XDF (Extensible Data Format) importer for EMG data.

XDF files can contain multiple streams (EMG, EEG, markers, etc.). This module provides tools to explore XDF contents and selectively import specific streams.

BaseImporter

Bases: ABC

Base class for EMG data importers.

Source code in emgio/importers/base.py
class BaseImporter(ABC):
    """Base class for EMG data importers."""

    @abstractmethod
    def load(self, filepath: str) -> EMG:
        """
        Load EMG data from file.

        Args:
            filepath: Path to the input file

        Returns:
            EMG: EMG object containing the loaded data
        """
        pass

load(filepath) abstractmethod

Load EMG data from file.

Args: filepath: Path to the input file

Returns: EMG: EMG object containing the loaded data

Source code in emgio/importers/base.py
@abstractmethod
def load(self, filepath: str) -> EMG:
    """
    Load EMG data from file.

    Args:
        filepath: Path to the input file

    Returns:
        EMG: EMG object containing the loaded data
    """
    pass

EMG

Core EMG class for handling EMG data and metadata.

Attributes: signals (pd.DataFrame): Raw signal data with time as index. metadata (dict): Metadata dictionary containing recording information. channels (dict): Channel information including type, unit, sampling frequency. events (pd.DataFrame): Annotations or events associated with the signals, with columns 'onset', 'duration', 'description'.

Source code in emgio/core/emg.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
class EMG:
    """
    Core EMG class for handling EMG data and metadata.

    Attributes:
        signals (pd.DataFrame): Raw signal data with time as index.
        metadata (dict): Metadata dictionary containing recording information.
        channels (dict): Channel information including type, unit, sampling frequency.
        events (pd.DataFrame): Annotations or events associated with the signals,
                               with columns 'onset', 'duration', 'description'.
    """

    def __init__(self):
        """Initialize an empty EMG object."""
        self.signals = None
        self.metadata = {}
        self.channels = {}
        # Initialize events as an empty DataFrame with specified columns
        self.events = pd.DataFrame(columns=["onset", "duration", "description"])

    def plot_signals(
        self,
        channels=None,
        time_range=None,
        offset_scale=0.8,
        uniform_scale=True,
        detrend=False,
        grid=True,
        title=None,
        show=True,
        plt_module=None,
    ):
        """
        Plot EMG signals in a single plot with vertical offsets.

        Args:
            channels: List of channels to plot. If None, plot all channels.
            time_range: Tuple of (start_time, end_time) to plot. If None, plot all data.
            offset_scale: Portion of allocated space each signal can use (0.0 to 1.0).
            uniform_scale: Whether to use the same scale for all signals.
            detrend: Whether to remove mean from signals before plotting.
            grid: Whether to show grid lines.
            title: Optional title for the figure.
            show: Whether to display the plot.
            plt_module: Matplotlib pyplot module to use.
        """
        # Delegate to the static plotting function in visualization module
        static_plot_signals(
            emg_object=self,
            channels=channels,
            time_range=time_range,
            offset_scale=offset_scale,
            uniform_scale=uniform_scale,
            detrend=detrend,
            grid=grid,
            title=title,
            show=show,
            plt_module=plt_module,
        )

    @classmethod
    def _infer_importer(cls, filepath: str) -> str:
        """
        Infer the importer to use based on the file extension.
        """
        extension = os.path.splitext(filepath)[1].lower()
        if extension in {".edf", ".bdf"}:
            return "edf"
        elif extension in {".set"}:
            return "eeglab"
        elif extension in {".otb", ".otb+"}:
            return "otb"
        elif extension in {".csv", ".txt"}:
            return "csv"
        elif extension in {".hea", ".dat", ".atr"}:
            return "wfdb"
        elif extension in {".xdf", ".xdfz"}:
            return "xdf"
        else:
            raise ValueError(f"Unsupported file extension: {extension}")

    @classmethod
    def from_file(
        cls,
        filepath: str,
        importer: Literal["trigno", "otb", "eeglab", "edf", "csv", "wfdb", "xdf"] | None = None,
        force_csv: bool = False,
        **kwargs,
    ) -> "EMG":
        """
        The method to create EMG object from file.

        Args:
            filepath: Path to the input file
            importer: Name of the importer to use. Can be one of the following:
                - 'trigno': Delsys Trigno EMG system (CSV)
                - 'otb': OTB/OTB+ EMG system (OTB, OTB+)
                - 'eeglab': EEGLAB .set files (SET)
                - 'edf': EDF/EDF+/BDF/BDF+ format (EDF, BDF)
                - 'csv': Generic CSV (or TXT) files with columnar data
                - 'wfdb': Waveform Database (WFDB)
                - 'xdf': XDF format (multi-stream Lab Streaming Layer files)
                If None, the importer will be inferred from the file extension.
                Automatic import is supported for CSV/TXT files.
            force_csv: If True and importer is 'csv', forces using the generic CSV
                      importer even if the file appears to match a specialized format.
            **kwargs: Additional arguments passed to the importer.
                For XDF files, useful kwargs include:
                - stream_names: List of stream names to import
                - stream_types: List of stream types to import (e.g., ["EMG", "EXG"])
                - stream_ids: List of stream IDs to import

        Returns:
            EMG: New EMG object with loaded data
        """
        if importer is None:
            importer = cls._infer_importer(filepath)

        importers = {
            "trigno": "TrignoImporter",  # CSV with Delsys Trigno Headers
            "otb": "OTBImporter",  # OTB/OTB+ EMG system data
            "edf": "EDFImporter",  # EDF/EDF+/BDF format
            "eeglab": "EEGLABImporter",  # EEGLAB .set files
            "csv": "CSVImporter",  # Generic CSV/Text files
            "wfdb": "WFDBImporter",  # Waveform Database format
            "xdf": "XDFImporter",  # XDF multi-stream format
        }

        if importer not in importers:
            raise ValueError(
                f"Unsupported importer: {importer}. "
                f"Available importers: {list(importers.keys())}\n"
                "- trigno: Delsys Trigno EMG system\n"
                "- otb: OTB/OTB+ EMG system\n"
                "- edf: EDF/EDF+/BDF format\n"
                "- eeglab: EEGLAB .set files\n"
                "- csv: Generic CSV/Text files\n"
                "- wfdb: Waveform Database\n"
                "- xdf: XDF multi-stream format"
            )

        # If using CSV importer and force_csv is set, pass it as force_generic
        if importer == "csv":
            kwargs["force_generic"] = force_csv

        # Import the appropriate importer class
        importer_module = __import__(
            f"emgio.importers.{importer}", globals(), locals(), [importers[importer]]
        )
        importer_class = getattr(importer_module, importers[importer])

        # Create importer instance and load data
        return importer_class().load(filepath, **kwargs)

    def select_channels(
        self,
        channels: str | list[str] | None = None,
        channel_type: str | None = None,
        inplace: bool = False,
    ) -> "EMG":
        """
        Select specific channels from the data and return a new EMG object.

        Args:
            channels: Channel name or list of channel names to select. If None and
                    channel_type is specified, selects all channels of that type.
            channel_type: Type of channels to select ('EMG', 'ACC', 'GYRO', etc.).
                        If specified with channels, filters the selection to only
                        channels of this type.

        Returns:
            EMG: A new EMG object containing only the selected channels

        Examples:
            # Select specific channels
            new_emg = emg.select_channels(['EMG1', 'ACC1'])

            # Select all EMG channels
            emg_only = emg.select_channels(channel_type='EMG')

            # Select specific EMG channels only, this example does not select ACC channels
            emg_subset = emg.select_channels(['EMG1', 'ACC1'], channel_type='EMG')
        """
        if self.signals is None:
            raise ValueError("No signals loaded")

        # If channel_type specified but no channels, select all of that type
        if channels is None and channel_type is not None:
            channels = [
                ch for ch, info in self.channels.items() if info["channel_type"] == channel_type
            ]
            if not channels:
                raise ValueError(f"No channels found of type: {channel_type}")
        elif isinstance(channels, str):
            channels = [channels]

        # Validate channels exist
        if not all(ch in self.signals.columns for ch in channels):
            missing = [ch for ch in channels if ch not in self.signals.columns]
            raise ValueError(f"Channels not found: {missing}")

        # Filter by type if specified
        if channel_type is not None:
            channels = [ch for ch in channels if self.channels[ch]["channel_type"] == channel_type]
            if not channels:
                raise ValueError(f"None of the selected channels are of type: {channel_type}")

        # Create new EMG object
        new_emg = EMG()

        # Copy selected signals and channels
        new_emg.signals = self.signals[channels].copy()
        new_emg.channels = {ch: self.channels[ch].copy() for ch in channels}

        # Copy metadata
        new_emg.metadata = self.metadata.copy()

        if not inplace:
            return new_emg
        else:
            self.signals = new_emg.signals
            self.channels = new_emg.channels
            self.metadata = new_emg.metadata
            return self

    def get_channel_types(self) -> list[str]:
        """
        Get list of unique channel types in the data.

        Returns:
            List of channel types (e.g., ['EMG', 'ACC', 'GYRO'])
        """
        return list({info["channel_type"] for info in self.channels.values()})

    def get_channels_by_type(self, channel_type: str) -> list[str]:
        """
        Get list of channels of a specific type.

        Args:
            channel_type: Type of channels to get ('EMG', 'ACC', 'GYRO', etc.)

        Returns:
            List of channel names of the specified type
        """
        return [ch for ch, info in self.channels.items() if info["channel_type"] == channel_type]

    def to_edf(
        self,
        filepath: str,
        method: str = "both",
        fft_noise_range: tuple = None,
        svd_rank: int = None,
        precision_threshold: float = 0.01,
        format: Literal["auto", "edf", "bdf"] = "auto",
        bypass_analysis: bool | None = None,
        verify: bool = False,
        verify_tolerance: float = 1e-6,
        verify_channel_map: dict[str, str] | None = None,
        verify_plot: bool = False,
        events_df: pd.DataFrame | None = None,
        create_channels_tsv: bool = True,
        **kwargs,
    ) -> str | None:
        """
        Export EMG data to EDF/BDF format, optionally including events.

        Args:
            filepath: Path to save the EDF/BDF file
            method: Method for signal analysis ('svd', 'fft', or 'both')
                'svd': Uses Singular Value Decomposition for noise floor estimation
                'fft': Uses Fast Fourier Transform for noise floor estimation
                'both': Uses both methods and takes the minimum noise floor (default)
            fft_noise_range: Optional tuple (min_freq, max_freq) specifying frequency range for noise in FFT method
            svd_rank: Optional manual rank cutoff for signal/noise separation in SVD method
            precision_threshold: Maximum acceptable precision loss percentage (default: 0.01%)
            format: Format to use ('auto', 'edf', or 'bdf'). Default is 'auto'.
                    If 'edf' or 'bdf' is specified, that format will be used directly.
                    If 'auto', the format (EDF/16-bit or BDF/24-bit) is chosen based
                    on signal analysis to minimize precision loss while preferring EDF
                    if sufficient.
            bypass_analysis: If True, skip signal analysis step when format is explicitly
                             set to 'edf' or 'bdf'. If None (default), analysis is skipped
                             automatically when format is forced. Set to False to force
                             analysis even with a specified format. Ignored if format='auto'.
            verify: If True, reload the exported file and compare signals with the original
                    to check for data integrity loss. Results are printed. (default: False)
            verify_tolerance: Absolute tolerance used when comparing signals during verification. (default: 1e-6)
            verify_channel_map: Optional dictionary mapping original channel names (keys)
                                to reloaded channel names (values) for verification.
                                Used if `verify` is True and channel names might differ.
            verify_plot: If True and verify is True, plots a comparison of original vs reloaded signals.
            events_df: Optional DataFrame with events ('onset', 'duration', 'description').
                      If None, uses self.events. (This provides flexibility)
            create_channels_tsv: If True, create a BIDS-compliant channels.tsv file (default: True)
            **kwargs: Additional arguments for the EDF exporter

        Returns:
            Union[str, None]: If verify is True, returns a string with verification results.
                             Otherwise, returns None.

        Raises:
            ValueError: If no signals are loaded
        """
        from ..exporters.edf import EDFExporter  # Local import

        if self.signals is None:
            raise ValueError("No signals loaded")

        # --- Determine if analysis should be bypassed ---
        final_bypass_analysis = False
        if format.lower() == "auto":
            if bypass_analysis is True:
                logging.warning(
                    "bypass_analysis=True ignored because format='auto'. Analysis is required."
                )
            # Analysis is always needed for 'auto' format
            final_bypass_analysis = False
        elif format.lower() in ["edf", "bdf"]:
            if bypass_analysis is None:
                # Default behaviour: skip analysis if format is forced
                final_bypass_analysis = True
                msg = (
                    f"Format forced to '{format}'. Skipping signal analysis for faster export. "
                    "Set bypass_analysis=False to force analysis."
                )
                logging.log(logging.CRITICAL, msg)
            elif bypass_analysis is True:
                final_bypass_analysis = True
                logging.log(logging.CRITICAL, "bypass_analysis=True set. Skipping signal analysis.")
            else:  # bypass_analysis is False
                final_bypass_analysis = False
                logging.info(
                    f"Format forced to '{format}' but bypass_analysis=False. Performing signal analysis."
                )
        else:
            # Should not happen if Literal type hint works, but good practice
            logging.warning(
                f"Unknown format '{format}'. Defaulting to 'auto' behavior (analysis enabled)."
            )
            format = "auto"
            final_bypass_analysis = False

        # Determine which events DataFrame to use
        if events_df is None:
            events_to_export = self.events
        else:
            events_to_export = events_df

        # Combine parameters
        all_params = {
            "precision_threshold": precision_threshold,
            "method": method,
            "fft_noise_range": fft_noise_range,
            "svd_rank": svd_rank,
            "format": format,
            "bypass_analysis": final_bypass_analysis,
            "events_df": events_to_export,  # Pass the events dataframe
            "create_channels_tsv": create_channels_tsv,
            **kwargs,
        }

        EDFExporter.export(self, filepath, **all_params)

        verification_report_dict = None
        if verify:
            logging.info(f"Verification requested. Reloading exported file: {filepath}")
            try:
                # Reload the exported file
                reloaded_emg = EMG.from_file(filepath, importer="edf")

                logging.info("Comparing original signals with reloaded signals...")
                # Compare signals using the imported function
                verification_results = compare_signals(
                    self, reloaded_emg, tolerance=verify_tolerance, channel_map=verify_channel_map
                )

                # Generate and log report using the imported function
                report_verification_results(verification_results, verify_tolerance)
                verification_report_dict = verification_results

                # Plot comparison using imported function if requested
                summary = verification_results.get("channel_summary", {})
                comparison_mode = summary.get("comparison_mode", "unknown")
                compared_count = sum(1 for k in verification_results if k != "channel_summary")

                if verify_plot and compared_count > 0 and comparison_mode != "failed":
                    plot_comparison(self, reloaded_emg, channel_map=verify_channel_map)
                elif verify_plot:
                    logging.warning(
                        "Skipping verification plot: No channels were successfully compared."
                    )

            except Exception as e:
                logging.error(f"Verification failed during reload or comparison: {e}")
                verification_report_dict = {
                    "error": str(e),
                    "channel_summary": {"comparison_mode": "failed"},
                }

        return verification_report_dict

    def set_metadata(self, key: str, value: any) -> None:
        """
        Set metadata value.

        Args:
            key: Metadata key
            value: Metadata value
        """
        self.metadata[key] = value

    def get_metadata(self, key: str) -> any:
        """
        Get metadata value.

        Args:
            key: Metadata key

        Returns:
            Value associated with the key
        """
        return self.metadata.get(key)

    def add_channel(
        self,
        label: str,
        data: np.ndarray,
        sample_frequency: float,
        physical_dimension: str,
        prefilter: str = "n/a",
        channel_type: str = "EMG",
    ) -> None:
        """
        Add a new channel to the EMG data.

        Args:
            label: Channel label or name (as per EDF specification)
            data: Channel data
            sample_frequency: Sampling frequency in Hz (as per EDF specification)
            physical_dimension: Physical dimension/unit of measurement (as per EDF specification)
            prefilter: Pre-filtering applied to the channel
            channel_type: Channel type ('EMG', 'ACC', 'GYRO', etc.)
        """
        if self.signals is None:
            # Create DataFrame with time index
            time = np.arange(len(data)) / sample_frequency
            self.signals = pd.DataFrame(index=time)

        self.signals[label] = data
        self.channels[label] = {
            "sample_frequency": sample_frequency,
            "physical_dimension": physical_dimension,
            "prefilter": prefilter,
            "channel_type": channel_type,
        }

    def add_event(self, onset: float, duration: float, description: str) -> None:
        """
        Add an event/annotation to the EMG object.

        Args:
            onset: Event onset time in seconds.
            duration: Event duration in seconds.
            description: Event description string.
        """
        new_event = pd.DataFrame(
            [{"onset": onset, "duration": duration, "description": description}]
        )
        # Use pd.concat for appending, ignore_index=True resets the index
        self.events = pd.concat([self.events, new_event], ignore_index=True)
        # Sort events by onset time for consistency
        self.events.sort_values(by="onset", inplace=True)
        self.events.reset_index(drop=True, inplace=True)

__init__()

Source code in emgio/core/emg.py
def __init__(self):
    """Initialize an empty EMG object."""
    self.signals = None
    self.metadata = {}
    self.channels = {}
    # Initialize events as an empty DataFrame with specified columns
    self.events = pd.DataFrame(columns=["onset", "duration", "description"])

add_channel(label, data, sample_frequency, physical_dimension, prefilter='n/a', channel_type='EMG')

Add a new channel to the EMG data.

Args: label: Channel label or name (as per EDF specification) data: Channel data sample_frequency: Sampling frequency in Hz (as per EDF specification) physical_dimension: Physical dimension/unit of measurement (as per EDF specification) prefilter: Pre-filtering applied to the channel channel_type: Channel type ('EMG', 'ACC', 'GYRO', etc.)

Source code in emgio/core/emg.py
def add_channel(
    self,
    label: str,
    data: np.ndarray,
    sample_frequency: float,
    physical_dimension: str,
    prefilter: str = "n/a",
    channel_type: str = "EMG",
) -> None:
    """
    Add a new channel to the EMG data.

    Args:
        label: Channel label or name (as per EDF specification)
        data: Channel data
        sample_frequency: Sampling frequency in Hz (as per EDF specification)
        physical_dimension: Physical dimension/unit of measurement (as per EDF specification)
        prefilter: Pre-filtering applied to the channel
        channel_type: Channel type ('EMG', 'ACC', 'GYRO', etc.)
    """
    if self.signals is None:
        # Create DataFrame with time index
        time = np.arange(len(data)) / sample_frequency
        self.signals = pd.DataFrame(index=time)

    self.signals[label] = data
    self.channels[label] = {
        "sample_frequency": sample_frequency,
        "physical_dimension": physical_dimension,
        "prefilter": prefilter,
        "channel_type": channel_type,
    }

add_event(onset, duration, description)

Add an event/annotation to the EMG object.

Args: onset: Event onset time in seconds. duration: Event duration in seconds. description: Event description string.

Source code in emgio/core/emg.py
def add_event(self, onset: float, duration: float, description: str) -> None:
    """
    Add an event/annotation to the EMG object.

    Args:
        onset: Event onset time in seconds.
        duration: Event duration in seconds.
        description: Event description string.
    """
    new_event = pd.DataFrame(
        [{"onset": onset, "duration": duration, "description": description}]
    )
    # Use pd.concat for appending, ignore_index=True resets the index
    self.events = pd.concat([self.events, new_event], ignore_index=True)
    # Sort events by onset time for consistency
    self.events.sort_values(by="onset", inplace=True)
    self.events.reset_index(drop=True, inplace=True)

from_file(filepath, importer=None, force_csv=False, **kwargs) classmethod

The method to create EMG object from file.

Args: filepath: Path to the input file importer: Name of the importer to use. Can be one of the following: - 'trigno': Delsys Trigno EMG system (CSV) - 'otb': OTB/OTB+ EMG system (OTB, OTB+) - 'eeglab': EEGLAB .set files (SET) - 'edf': EDF/EDF+/BDF/BDF+ format (EDF, BDF) - 'csv': Generic CSV (or TXT) files with columnar data - 'wfdb': Waveform Database (WFDB) - 'xdf': XDF format (multi-stream Lab Streaming Layer files) If None, the importer will be inferred from the file extension. Automatic import is supported for CSV/TXT files. force_csv: If True and importer is 'csv', forces using the generic CSV importer even if the file appears to match a specialized format. **kwargs: Additional arguments passed to the importer. For XDF files, useful kwargs include: - stream_names: List of stream names to import - stream_types: List of stream types to import (e.g., ["EMG", "EXG"]) - stream_ids: List of stream IDs to import

Returns: EMG: New EMG object with loaded data

Source code in emgio/core/emg.py
@classmethod
def from_file(
    cls,
    filepath: str,
    importer: Literal["trigno", "otb", "eeglab", "edf", "csv", "wfdb", "xdf"] | None = None,
    force_csv: bool = False,
    **kwargs,
) -> "EMG":
    """
    The method to create EMG object from file.

    Args:
        filepath: Path to the input file
        importer: Name of the importer to use. Can be one of the following:
            - 'trigno': Delsys Trigno EMG system (CSV)
            - 'otb': OTB/OTB+ EMG system (OTB, OTB+)
            - 'eeglab': EEGLAB .set files (SET)
            - 'edf': EDF/EDF+/BDF/BDF+ format (EDF, BDF)
            - 'csv': Generic CSV (or TXT) files with columnar data
            - 'wfdb': Waveform Database (WFDB)
            - 'xdf': XDF format (multi-stream Lab Streaming Layer files)
            If None, the importer will be inferred from the file extension.
            Automatic import is supported for CSV/TXT files.
        force_csv: If True and importer is 'csv', forces using the generic CSV
                  importer even if the file appears to match a specialized format.
        **kwargs: Additional arguments passed to the importer.
            For XDF files, useful kwargs include:
            - stream_names: List of stream names to import
            - stream_types: List of stream types to import (e.g., ["EMG", "EXG"])
            - stream_ids: List of stream IDs to import

    Returns:
        EMG: New EMG object with loaded data
    """
    if importer is None:
        importer = cls._infer_importer(filepath)

    importers = {
        "trigno": "TrignoImporter",  # CSV with Delsys Trigno Headers
        "otb": "OTBImporter",  # OTB/OTB+ EMG system data
        "edf": "EDFImporter",  # EDF/EDF+/BDF format
        "eeglab": "EEGLABImporter",  # EEGLAB .set files
        "csv": "CSVImporter",  # Generic CSV/Text files
        "wfdb": "WFDBImporter",  # Waveform Database format
        "xdf": "XDFImporter",  # XDF multi-stream format
    }

    if importer not in importers:
        raise ValueError(
            f"Unsupported importer: {importer}. "
            f"Available importers: {list(importers.keys())}\n"
            "- trigno: Delsys Trigno EMG system\n"
            "- otb: OTB/OTB+ EMG system\n"
            "- edf: EDF/EDF+/BDF format\n"
            "- eeglab: EEGLAB .set files\n"
            "- csv: Generic CSV/Text files\n"
            "- wfdb: Waveform Database\n"
            "- xdf: XDF multi-stream format"
        )

    # If using CSV importer and force_csv is set, pass it as force_generic
    if importer == "csv":
        kwargs["force_generic"] = force_csv

    # Import the appropriate importer class
    importer_module = __import__(
        f"emgio.importers.{importer}", globals(), locals(), [importers[importer]]
    )
    importer_class = getattr(importer_module, importers[importer])

    # Create importer instance and load data
    return importer_class().load(filepath, **kwargs)

get_channel_types()

Get list of unique channel types in the data.

Returns: List of channel types (e.g., ['EMG', 'ACC', 'GYRO'])

Source code in emgio/core/emg.py
def get_channel_types(self) -> list[str]:
    """
    Get list of unique channel types in the data.

    Returns:
        List of channel types (e.g., ['EMG', 'ACC', 'GYRO'])
    """
    return list({info["channel_type"] for info in self.channels.values()})

get_channels_by_type(channel_type)

Get list of channels of a specific type.

Args: channel_type: Type of channels to get ('EMG', 'ACC', 'GYRO', etc.)

Returns: List of channel names of the specified type

Source code in emgio/core/emg.py
def get_channels_by_type(self, channel_type: str) -> list[str]:
    """
    Get list of channels of a specific type.

    Args:
        channel_type: Type of channels to get ('EMG', 'ACC', 'GYRO', etc.)

    Returns:
        List of channel names of the specified type
    """
    return [ch for ch, info in self.channels.items() if info["channel_type"] == channel_type]

get_metadata(key)

Get metadata value.

Args: key: Metadata key

Returns: Value associated with the key

Source code in emgio/core/emg.py
def get_metadata(self, key: str) -> any:
    """
    Get metadata value.

    Args:
        key: Metadata key

    Returns:
        Value associated with the key
    """
    return self.metadata.get(key)

plot_signals(channels=None, time_range=None, offset_scale=0.8, uniform_scale=True, detrend=False, grid=True, title=None, show=True, plt_module=None)

Plot EMG signals in a single plot with vertical offsets.

Args: channels: List of channels to plot. If None, plot all channels. time_range: Tuple of (start_time, end_time) to plot. If None, plot all data. offset_scale: Portion of allocated space each signal can use (0.0 to 1.0). uniform_scale: Whether to use the same scale for all signals. detrend: Whether to remove mean from signals before plotting. grid: Whether to show grid lines. title: Optional title for the figure. show: Whether to display the plot. plt_module: Matplotlib pyplot module to use.

Source code in emgio/core/emg.py
def plot_signals(
    self,
    channels=None,
    time_range=None,
    offset_scale=0.8,
    uniform_scale=True,
    detrend=False,
    grid=True,
    title=None,
    show=True,
    plt_module=None,
):
    """
    Plot EMG signals in a single plot with vertical offsets.

    Args:
        channels: List of channels to plot. If None, plot all channels.
        time_range: Tuple of (start_time, end_time) to plot. If None, plot all data.
        offset_scale: Portion of allocated space each signal can use (0.0 to 1.0).
        uniform_scale: Whether to use the same scale for all signals.
        detrend: Whether to remove mean from signals before plotting.
        grid: Whether to show grid lines.
        title: Optional title for the figure.
        show: Whether to display the plot.
        plt_module: Matplotlib pyplot module to use.
    """
    # Delegate to the static plotting function in visualization module
    static_plot_signals(
        emg_object=self,
        channels=channels,
        time_range=time_range,
        offset_scale=offset_scale,
        uniform_scale=uniform_scale,
        detrend=detrend,
        grid=grid,
        title=title,
        show=show,
        plt_module=plt_module,
    )

select_channels(channels=None, channel_type=None, inplace=False)

Select specific channels from the data and return a new EMG object.

Args: channels: Channel name or list of channel names to select. If None and channel_type is specified, selects all channels of that type. channel_type: Type of channels to select ('EMG', 'ACC', 'GYRO', etc.). If specified with channels, filters the selection to only channels of this type.

Returns: EMG: A new EMG object containing only the selected channels

Examples: # Select specific channels new_emg = emg.select_channels(['EMG1', 'ACC1'])

# Select all EMG channels
emg_only = emg.select_channels(channel_type='EMG')

# Select specific EMG channels only, this example does not select ACC channels
emg_subset = emg.select_channels(['EMG1', 'ACC1'], channel_type='EMG')
Source code in emgio/core/emg.py
def select_channels(
    self,
    channels: str | list[str] | None = None,
    channel_type: str | None = None,
    inplace: bool = False,
) -> "EMG":
    """
    Select specific channels from the data and return a new EMG object.

    Args:
        channels: Channel name or list of channel names to select. If None and
                channel_type is specified, selects all channels of that type.
        channel_type: Type of channels to select ('EMG', 'ACC', 'GYRO', etc.).
                    If specified with channels, filters the selection to only
                    channels of this type.

    Returns:
        EMG: A new EMG object containing only the selected channels

    Examples:
        # Select specific channels
        new_emg = emg.select_channels(['EMG1', 'ACC1'])

        # Select all EMG channels
        emg_only = emg.select_channels(channel_type='EMG')

        # Select specific EMG channels only, this example does not select ACC channels
        emg_subset = emg.select_channels(['EMG1', 'ACC1'], channel_type='EMG')
    """
    if self.signals is None:
        raise ValueError("No signals loaded")

    # If channel_type specified but no channels, select all of that type
    if channels is None and channel_type is not None:
        channels = [
            ch for ch, info in self.channels.items() if info["channel_type"] == channel_type
        ]
        if not channels:
            raise ValueError(f"No channels found of type: {channel_type}")
    elif isinstance(channels, str):
        channels = [channels]

    # Validate channels exist
    if not all(ch in self.signals.columns for ch in channels):
        missing = [ch for ch in channels if ch not in self.signals.columns]
        raise ValueError(f"Channels not found: {missing}")

    # Filter by type if specified
    if channel_type is not None:
        channels = [ch for ch in channels if self.channels[ch]["channel_type"] == channel_type]
        if not channels:
            raise ValueError(f"None of the selected channels are of type: {channel_type}")

    # Create new EMG object
    new_emg = EMG()

    # Copy selected signals and channels
    new_emg.signals = self.signals[channels].copy()
    new_emg.channels = {ch: self.channels[ch].copy() for ch in channels}

    # Copy metadata
    new_emg.metadata = self.metadata.copy()

    if not inplace:
        return new_emg
    else:
        self.signals = new_emg.signals
        self.channels = new_emg.channels
        self.metadata = new_emg.metadata
        return self

set_metadata(key, value)

Set metadata value.

Args: key: Metadata key value: Metadata value

Source code in emgio/core/emg.py
def set_metadata(self, key: str, value: any) -> None:
    """
    Set metadata value.

    Args:
        key: Metadata key
        value: Metadata value
    """
    self.metadata[key] = value

to_edf(filepath, method='both', fft_noise_range=None, svd_rank=None, precision_threshold=0.01, format='auto', bypass_analysis=None, verify=False, verify_tolerance=1e-06, verify_channel_map=None, verify_plot=False, events_df=None, create_channels_tsv=True, **kwargs)

Export EMG data to EDF/BDF format, optionally including events.

Args: filepath: Path to save the EDF/BDF file method: Method for signal analysis ('svd', 'fft', or 'both') 'svd': Uses Singular Value Decomposition for noise floor estimation 'fft': Uses Fast Fourier Transform for noise floor estimation 'both': Uses both methods and takes the minimum noise floor (default) fft_noise_range: Optional tuple (min_freq, max_freq) specifying frequency range for noise in FFT method svd_rank: Optional manual rank cutoff for signal/noise separation in SVD method precision_threshold: Maximum acceptable precision loss percentage (default: 0.01%) format: Format to use ('auto', 'edf', or 'bdf'). Default is 'auto'. If 'edf' or 'bdf' is specified, that format will be used directly. If 'auto', the format (EDF/16-bit or BDF/24-bit) is chosen based on signal analysis to minimize precision loss while preferring EDF if sufficient. bypass_analysis: If True, skip signal analysis step when format is explicitly set to 'edf' or 'bdf'. If None (default), analysis is skipped automatically when format is forced. Set to False to force analysis even with a specified format. Ignored if format='auto'. verify: If True, reload the exported file and compare signals with the original to check for data integrity loss. Results are printed. (default: False) verify_tolerance: Absolute tolerance used when comparing signals during verification. (default: 1e-6) verify_channel_map: Optional dictionary mapping original channel names (keys) to reloaded channel names (values) for verification. Used if verify is True and channel names might differ. verify_plot: If True and verify is True, plots a comparison of original vs reloaded signals. events_df: Optional DataFrame with events ('onset', 'duration', 'description'). If None, uses self.events. (This provides flexibility) create_channels_tsv: If True, create a BIDS-compliant channels.tsv file (default: True) **kwargs: Additional arguments for the EDF exporter

Returns: Union[str, None]: If verify is True, returns a string with verification results. Otherwise, returns None.

Raises: ValueError: If no signals are loaded

Source code in emgio/core/emg.py
def to_edf(
    self,
    filepath: str,
    method: str = "both",
    fft_noise_range: tuple = None,
    svd_rank: int = None,
    precision_threshold: float = 0.01,
    format: Literal["auto", "edf", "bdf"] = "auto",
    bypass_analysis: bool | None = None,
    verify: bool = False,
    verify_tolerance: float = 1e-6,
    verify_channel_map: dict[str, str] | None = None,
    verify_plot: bool = False,
    events_df: pd.DataFrame | None = None,
    create_channels_tsv: bool = True,
    **kwargs,
) -> str | None:
    """
    Export EMG data to EDF/BDF format, optionally including events.

    Args:
        filepath: Path to save the EDF/BDF file
        method: Method for signal analysis ('svd', 'fft', or 'both')
            'svd': Uses Singular Value Decomposition for noise floor estimation
            'fft': Uses Fast Fourier Transform for noise floor estimation
            'both': Uses both methods and takes the minimum noise floor (default)
        fft_noise_range: Optional tuple (min_freq, max_freq) specifying frequency range for noise in FFT method
        svd_rank: Optional manual rank cutoff for signal/noise separation in SVD method
        precision_threshold: Maximum acceptable precision loss percentage (default: 0.01%)
        format: Format to use ('auto', 'edf', or 'bdf'). Default is 'auto'.
                If 'edf' or 'bdf' is specified, that format will be used directly.
                If 'auto', the format (EDF/16-bit or BDF/24-bit) is chosen based
                on signal analysis to minimize precision loss while preferring EDF
                if sufficient.
        bypass_analysis: If True, skip signal analysis step when format is explicitly
                         set to 'edf' or 'bdf'. If None (default), analysis is skipped
                         automatically when format is forced. Set to False to force
                         analysis even with a specified format. Ignored if format='auto'.
        verify: If True, reload the exported file and compare signals with the original
                to check for data integrity loss. Results are printed. (default: False)
        verify_tolerance: Absolute tolerance used when comparing signals during verification. (default: 1e-6)
        verify_channel_map: Optional dictionary mapping original channel names (keys)
                            to reloaded channel names (values) for verification.
                            Used if `verify` is True and channel names might differ.
        verify_plot: If True and verify is True, plots a comparison of original vs reloaded signals.
        events_df: Optional DataFrame with events ('onset', 'duration', 'description').
                  If None, uses self.events. (This provides flexibility)
        create_channels_tsv: If True, create a BIDS-compliant channels.tsv file (default: True)
        **kwargs: Additional arguments for the EDF exporter

    Returns:
        Union[str, None]: If verify is True, returns a string with verification results.
                         Otherwise, returns None.

    Raises:
        ValueError: If no signals are loaded
    """
    from ..exporters.edf import EDFExporter  # Local import

    if self.signals is None:
        raise ValueError("No signals loaded")

    # --- Determine if analysis should be bypassed ---
    final_bypass_analysis = False
    if format.lower() == "auto":
        if bypass_analysis is True:
            logging.warning(
                "bypass_analysis=True ignored because format='auto'. Analysis is required."
            )
        # Analysis is always needed for 'auto' format
        final_bypass_analysis = False
    elif format.lower() in ["edf", "bdf"]:
        if bypass_analysis is None:
            # Default behaviour: skip analysis if format is forced
            final_bypass_analysis = True
            msg = (
                f"Format forced to '{format}'. Skipping signal analysis for faster export. "
                "Set bypass_analysis=False to force analysis."
            )
            logging.log(logging.CRITICAL, msg)
        elif bypass_analysis is True:
            final_bypass_analysis = True
            logging.log(logging.CRITICAL, "bypass_analysis=True set. Skipping signal analysis.")
        else:  # bypass_analysis is False
            final_bypass_analysis = False
            logging.info(
                f"Format forced to '{format}' but bypass_analysis=False. Performing signal analysis."
            )
    else:
        # Should not happen if Literal type hint works, but good practice
        logging.warning(
            f"Unknown format '{format}'. Defaulting to 'auto' behavior (analysis enabled)."
        )
        format = "auto"
        final_bypass_analysis = False

    # Determine which events DataFrame to use
    if events_df is None:
        events_to_export = self.events
    else:
        events_to_export = events_df

    # Combine parameters
    all_params = {
        "precision_threshold": precision_threshold,
        "method": method,
        "fft_noise_range": fft_noise_range,
        "svd_rank": svd_rank,
        "format": format,
        "bypass_analysis": final_bypass_analysis,
        "events_df": events_to_export,  # Pass the events dataframe
        "create_channels_tsv": create_channels_tsv,
        **kwargs,
    }

    EDFExporter.export(self, filepath, **all_params)

    verification_report_dict = None
    if verify:
        logging.info(f"Verification requested. Reloading exported file: {filepath}")
        try:
            # Reload the exported file
            reloaded_emg = EMG.from_file(filepath, importer="edf")

            logging.info("Comparing original signals with reloaded signals...")
            # Compare signals using the imported function
            verification_results = compare_signals(
                self, reloaded_emg, tolerance=verify_tolerance, channel_map=verify_channel_map
            )

            # Generate and log report using the imported function
            report_verification_results(verification_results, verify_tolerance)
            verification_report_dict = verification_results

            # Plot comparison using imported function if requested
            summary = verification_results.get("channel_summary", {})
            comparison_mode = summary.get("comparison_mode", "unknown")
            compared_count = sum(1 for k in verification_results if k != "channel_summary")

            if verify_plot and compared_count > 0 and comparison_mode != "failed":
                plot_comparison(self, reloaded_emg, channel_map=verify_channel_map)
            elif verify_plot:
                logging.warning(
                    "Skipping verification plot: No channels were successfully compared."
                )

        except Exception as e:
            logging.error(f"Verification failed during reload or comparison: {e}")
            verification_report_dict = {
                "error": str(e),
                "channel_summary": {"comparison_mode": "failed"},
            }

    return verification_report_dict

XDFImporter

Bases: BaseImporter

Importer for XDF (Extensible Data Format) files.

XDF files can contain multiple data streams. This importer allows selective import of specific streams by name, type, or ID.

Example: >>> # First, explore the file >>> from emgio.importers.xdf import summarize_xdf >>> summary = summarize_xdf("recording.xdf") >>> print(summary) >>> >>> # Import specific streams >>> importer = XDFImporter() >>> emg = importer.load("recording.xdf", stream_names=["EMG_stream"]) >>> >>> # Or import by type >>> emg = importer.load("recording.xdf", stream_types=["EMG", "EXG"])

Source code in emgio/importers/xdf.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
class XDFImporter(BaseImporter):
    """
    Importer for XDF (Extensible Data Format) files.

    XDF files can contain multiple data streams. This importer allows selective
    import of specific streams by name, type, or ID.

    Example:
        >>> # First, explore the file
        >>> from emgio.importers.xdf import summarize_xdf
        >>> summary = summarize_xdf("recording.xdf")
        >>> print(summary)
        >>>
        >>> # Import specific streams
        >>> importer = XDFImporter()
        >>> emg = importer.load("recording.xdf", stream_names=["EMG_stream"])
        >>>
        >>> # Or import by type
        >>> emg = importer.load("recording.xdf", stream_types=["EMG", "EXG"])
    """

    def load(
        self,
        filepath: str,
        stream_names: list[str] | None = None,
        stream_types: list[str] | None = None,
        stream_ids: list[int] | None = None,
        sync_streams: bool = True,
        default_channel_type: str = "EMG",
        include_timestamps: bool = False,
        reference_stream: str | None = None,
    ) -> EMG:
        """
        Load EMG data from an XDF file.

        Streams can be selected by name, type, or ID. If multiple selection
        criteria are provided, streams matching ANY criterion are included.
        If no selection criteria are provided, all streams with numeric data
        are loaded.

        Args:
            filepath: Path to the XDF file
            stream_names: List of stream names to import (case-insensitive)
            stream_types: List of stream types to import (e.g., ["EMG", "EXG"])
            stream_ids: List of stream IDs to import
            sync_streams: If True, synchronize streams to common timestamps.
                         If False, streams are loaded without synchronization.
            default_channel_type: Default channel type for channels without
                                 explicit type info (default: "EMG")
            include_timestamps: If True, add a timestamp channel for each stream
                               named "{stream_name}_LSL_timestamps" containing
                               the original LSL timestamps. Useful for preserving
                               timing information when exporting to formats like
                               EDF that require regular sampling.
            reference_stream: Optional stream name to use as the time base
                             reference. If not specified, the stream with the
                             highest sampling rate is used (recommended to
                             avoid data loss from downsampling).

        Returns:
            EMG: EMG object containing the loaded data

        Raises:
            ValueError: If no matching streams found or file cannot be read
            ImportError: If pyxdf is not installed
        """
        try:
            import pyxdf
        except ImportError as e:
            raise ImportError(
                "pyxdf is required for XDF file support. Install it with: pip install pyxdf"
            ) from e

        filepath = str(filepath)
        data, header = pyxdf.load_xdf(filepath)

        if not data:
            raise ValueError(f"No streams found in XDF file: {filepath}")

        # Filter streams based on selection criteria
        selected_streams = self._select_streams(data, stream_names, stream_types, stream_ids)

        if not selected_streams:
            # If no criteria specified, select all streams with numeric data
            if stream_names is None and stream_types is None and stream_ids is None:
                selected_streams = [
                    s
                    for s in data
                    if isinstance(s["time_series"], np.ndarray)
                    and s["time_series"].dtype.kind in "iufc"
                ]
            if not selected_streams:
                raise ValueError(
                    "No matching streams found. Use summarize_xdf() to explore the file."
                )

        # Create EMG object
        emg = EMG()

        # Store metadata
        emg.set_metadata("source_file", filepath)
        emg.set_metadata("device", "XDF")
        emg.set_metadata("stream_count", len(selected_streams))

        if sync_streams and len(selected_streams) > 1:
            self._load_synchronized_streams(
                emg, selected_streams, default_channel_type, include_timestamps, reference_stream
            )
        else:
            # Load streams (uses highest sample rate as reference unless specified)
            self._load_streams(
                emg, selected_streams, default_channel_type, include_timestamps, reference_stream
            )

        return emg

    def _select_streams(
        self,
        data: list[dict],
        stream_names: list[str] | None,
        stream_types: list[str] | None,
        stream_ids: list[int] | None,
    ) -> list[dict]:
        """Select streams based on criteria."""
        if stream_names is None and stream_types is None and stream_ids is None:
            return []  # Return empty to trigger "all streams" behavior

        selected = []
        for stream in data:
            info = stream["info"]
            name = info["name"][0] if "name" in info else ""
            stype = info["type"][0] if "type" in info else ""
            sid = info.get("stream_id", 0)

            # Check name match (case-insensitive)
            if stream_names and any(name.lower() == n.lower() for n in stream_names):
                selected.append(stream)
                continue

            # Check type match (case-insensitive)
            if stream_types and any(stype.upper() == t.upper() for t in stream_types):
                selected.append(stream)
                continue

            # Check ID match
            if stream_ids and sid in stream_ids:
                selected.append(stream)
                continue

        return selected

    def _load_streams(
        self,
        emg: EMG,
        streams: list[dict],
        default_channel_type: str,
        include_timestamps: bool = False,
        reference_stream: str | None = None,
    ) -> None:
        """Load streams and resample to a common time base.

        By default, uses the stream with the highest sampling rate as the
        reference to avoid data loss from downsampling. A specific reference
        stream can be specified by name.
        """
        # First pass: collect stream info and find reference stream
        stream_info_list = []
        for stream in streams:
            info = stream["info"]
            stream_name = info["name"][0] if "name" in info else "Unknown"
            time_series = stream["time_series"]
            timestamps = stream["time_stamps"]

            # Skip non-numpy arrays (e.g., marker streams are lists) or non-numeric data
            if not isinstance(time_series, np.ndarray):
                continue
            if time_series.dtype.kind not in "iufc" or len(time_series) == 0:
                continue

            # Get sampling rate
            srate = stream.get("effective_srate")
            if not srate:
                srate = float(info["nominal_srate"][0]) if "nominal_srate" in info else 0.0

            stream_info_list.append(
                {
                    "stream": stream,
                    "name": stream_name,
                    "info": info,
                    "time_series": time_series,
                    "timestamps": timestamps,
                    "srate": srate,
                }
            )

        if not stream_info_list:
            raise ValueError("No valid data found in selected streams")

        # Determine reference stream: user-specified, or highest sample rate
        ref_stream_info = None
        if reference_stream:
            # Find the user-specified reference stream
            for si in stream_info_list:
                if si["name"].lower() == reference_stream.lower():
                    ref_stream_info = si
                    break
            if ref_stream_info is None:
                raise ValueError(
                    f"Reference stream '{reference_stream}' not found in selected streams. "
                    f"Available: {[si['name'] for si in stream_info_list]}"
                )
        else:
            # Use stream with highest sampling rate (avoids downsampling data loss)
            ref_stream_info = max(stream_info_list, key=lambda x: x["srate"] or 0)

        base_srate = ref_stream_info["srate"]
        base_timestamps = ref_stream_info["timestamps"]

        # Second pass: collect all channel data
        all_data = {}
        stream_timestamp_data = {}  # Store timestamp data per stream

        for si in stream_info_list:
            stream_name = si["name"]
            time_series = si["time_series"]
            timestamps = si["timestamps"]
            srate = si["srate"]
            info = si["info"]

            # Store timestamp data for this stream if requested
            if include_timestamps:
                stream_timestamp_data[stream_name] = {
                    "timestamps": timestamps,
                    "srate": srate,
                }

            # Get channel info
            channel_labels, channel_types, channel_units = self._extract_channel_info(
                info, time_series.shape[1] if time_series.ndim > 1 else 1, stream_name
            )

            # Handle 1D data (single channel)
            if time_series.ndim == 1:
                time_series = time_series.reshape(-1, 1)

            # Add channels
            for i, label in enumerate(channel_labels):
                if i < time_series.shape[1]:
                    # Make label unique if needed
                    unique_label = label
                    counter = 1
                    while unique_label in all_data:
                        unique_label = f"{label}_{counter}"
                        counter += 1

                    all_data[unique_label] = {
                        "data": time_series[:, i],
                        "timestamps": timestamps,
                        "srate": srate,
                        "unit": channel_units[i] if i < len(channel_units) else "a.u.",
                        "type": channel_types[i]
                        if i < len(channel_types) and channel_types[i]
                        else default_channel_type,
                    }

        # Create time index from reference stream
        # Convert to relative time starting from 0
        if base_timestamps is not None and len(base_timestamps) > 0:
            time_index = base_timestamps - base_timestamps[0]
        else:
            # Fallback: create time index from sample count and rate
            n_samples = len(ref_stream_info["time_series"])
            if base_srate and base_srate > 0:
                time_index = np.arange(n_samples) / base_srate
            else:
                # If no valid sample rate, use sample indices as time
                time_index = np.arange(n_samples, dtype=float)

        # Create DataFrame
        df = pd.DataFrame(index=time_index)

        for label, ch_info in all_data.items():
            # Resample if needed (different stream lengths)
            ch_data = ch_info["data"]
            ch_timestamps = ch_info["timestamps"]
            if len(ch_data) != len(time_index):
                if len(ch_timestamps) > 0:
                    # Interpolate to match base timestamps
                    relative_ch_ts = ch_timestamps - ch_timestamps[0]
                    ch_data = np.interp(time_index, relative_ch_ts, ch_data)
                else:
                    # No timestamps available to resample mismatched data
                    raise ValueError(
                        f"Length mismatch for channel '{label}': "
                        f"{len(ch_data)} samples vs {len(time_index)} time points, "
                        "and no timestamps available for interpolation."
                    )

            df[label] = ch_data

            emg.channels[label] = {
                "sample_frequency": ch_info["srate"] if ch_info["srate"] else base_srate,
                "physical_dimension": ch_info["unit"],
                "prefilter": "n/a",
                "channel_type": ch_info["type"],
            }

        # Add timestamp channels if requested
        if include_timestamps and stream_timestamp_data:
            for stream_name, ts_info in stream_timestamp_data.items():
                ts_label = f"{stream_name}_LSL_timestamps"
                original_timestamps = ts_info["timestamps"]

                # Resample timestamps to match the common time index
                if len(original_timestamps) == 0:
                    # No timestamps available; create a NaN-filled array
                    resampled_ts = np.full(len(time_index), np.nan, dtype=float)
                elif len(original_timestamps) != len(time_index):
                    relative_ts = original_timestamps - original_timestamps[0]
                    resampled_ts = np.interp(time_index, relative_ts, original_timestamps)
                else:
                    resampled_ts = original_timestamps

                df[ts_label] = resampled_ts

                emg.channels[ts_label] = {
                    "sample_frequency": ts_info["srate"] if ts_info["srate"] else base_srate,
                    "physical_dimension": "s",  # seconds
                    "prefilter": "n/a",
                    "channel_type": "MISC",  # Miscellaneous channel type
                }

        emg.signals = df
        emg.set_metadata("srate", base_srate)

    def _load_synchronized_streams(
        self,
        emg: EMG,
        streams: list[dict],
        default_channel_type: str,
        include_timestamps: bool = False,
        reference_stream: str | None = None,
    ) -> None:
        """Load streams with timestamp synchronization.

        This method is an intentional wrapper around _load_streams, serving as
        a dedicated extension point for future synchronization enhancements.
        Currently, pyxdf handles clock synchronization during file loading,
        so this delegates to _load_streams without additional processing.
        """
        self._load_streams(emg, streams, default_channel_type, include_timestamps, reference_stream)

    def _extract_channel_info(
        self,
        info: dict,
        n_channels: int,
        stream_name: str,
    ) -> tuple:
        """Extract channel labels, types, and units from stream info.

        This method safely extracts channel metadata from XDF stream info,
        handling malformed or missing metadata gracefully.
        """
        channel_labels = []
        channel_types = []
        channel_units = []

        try:
            if "desc" in info and info["desc"] and info["desc"][0]:
                desc = info["desc"][0]
                if isinstance(desc, dict) and "channels" in desc and desc["channels"]:
                    channels_info = desc["channels"][0]
                    if isinstance(channels_info, dict) and "channel" in channels_info:
                        for ch in channels_info["channel"]:
                            if isinstance(ch, dict):
                                # Safely extract label
                                label = ""
                                if "label" in ch:
                                    label_val = ch.get("label", [""])
                                    if isinstance(label_val, list) and label_val:
                                        label = str(label_val[0]) if label_val[0] else ""
                                    elif isinstance(label_val, str):
                                        label = label_val

                                # Safely extract type
                                ch_type = ""
                                if "type" in ch:
                                    type_val = ch.get("type", [""])
                                    if isinstance(type_val, list) and type_val:
                                        ch_type = str(type_val[0]) if type_val[0] else ""
                                    elif isinstance(type_val, str):
                                        ch_type = type_val

                                # Safely extract unit
                                unit = ""
                                if "unit" in ch:
                                    unit_val = ch.get("unit", [""])
                                    if isinstance(unit_val, list) and unit_val:
                                        unit = str(unit_val[0]) if unit_val[0] else ""
                                    elif isinstance(unit_val, str):
                                        unit = unit_val

                                channel_labels.append(
                                    label if label else f"{stream_name}_Ch{len(channel_labels) + 1}"
                                )
                                # Infer type from label if not explicitly provided
                                if not ch_type and label:
                                    ch_type = _determine_channel_type_from_label(label)
                                channel_types.append(ch_type)
                                # Default to a.u. (arbitrary units); specific units like uV
                                # should be provided in stream metadata
                                channel_units.append(unit if unit else "a.u.")
        except (KeyError, IndexError, TypeError, AttributeError):
            # If metadata parsing fails, we'll fall back to default labels below
            pass

        # Fill in missing labels
        while len(channel_labels) < n_channels:
            channel_labels.append(f"{stream_name}_Ch{len(channel_labels) + 1}")
            channel_types.append("")
            channel_units.append("a.u.")

        return channel_labels, channel_types, channel_units

load(filepath, stream_names=None, stream_types=None, stream_ids=None, sync_streams=True, default_channel_type='EMG', include_timestamps=False, reference_stream=None)

Load EMG data from an XDF file.

Streams can be selected by name, type, or ID. If multiple selection criteria are provided, streams matching ANY criterion are included. If no selection criteria are provided, all streams with numeric data are loaded.

Args: filepath: Path to the XDF file stream_names: List of stream names to import (case-insensitive) stream_types: List of stream types to import (e.g., ["EMG", "EXG"]) stream_ids: List of stream IDs to import sync_streams: If True, synchronize streams to common timestamps. If False, streams are loaded without synchronization. default_channel_type: Default channel type for channels without explicit type info (default: "EMG") include_timestamps: If True, add a timestamp channel for each stream named "{stream_name}_LSL_timestamps" containing the original LSL timestamps. Useful for preserving timing information when exporting to formats like EDF that require regular sampling. reference_stream: Optional stream name to use as the time base reference. If not specified, the stream with the highest sampling rate is used (recommended to avoid data loss from downsampling).

Returns: EMG: EMG object containing the loaded data

Raises: ValueError: If no matching streams found or file cannot be read ImportError: If pyxdf is not installed

Source code in emgio/importers/xdf.py
def load(
    self,
    filepath: str,
    stream_names: list[str] | None = None,
    stream_types: list[str] | None = None,
    stream_ids: list[int] | None = None,
    sync_streams: bool = True,
    default_channel_type: str = "EMG",
    include_timestamps: bool = False,
    reference_stream: str | None = None,
) -> EMG:
    """
    Load EMG data from an XDF file.

    Streams can be selected by name, type, or ID. If multiple selection
    criteria are provided, streams matching ANY criterion are included.
    If no selection criteria are provided, all streams with numeric data
    are loaded.

    Args:
        filepath: Path to the XDF file
        stream_names: List of stream names to import (case-insensitive)
        stream_types: List of stream types to import (e.g., ["EMG", "EXG"])
        stream_ids: List of stream IDs to import
        sync_streams: If True, synchronize streams to common timestamps.
                     If False, streams are loaded without synchronization.
        default_channel_type: Default channel type for channels without
                             explicit type info (default: "EMG")
        include_timestamps: If True, add a timestamp channel for each stream
                           named "{stream_name}_LSL_timestamps" containing
                           the original LSL timestamps. Useful for preserving
                           timing information when exporting to formats like
                           EDF that require regular sampling.
        reference_stream: Optional stream name to use as the time base
                         reference. If not specified, the stream with the
                         highest sampling rate is used (recommended to
                         avoid data loss from downsampling).

    Returns:
        EMG: EMG object containing the loaded data

    Raises:
        ValueError: If no matching streams found or file cannot be read
        ImportError: If pyxdf is not installed
    """
    try:
        import pyxdf
    except ImportError as e:
        raise ImportError(
            "pyxdf is required for XDF file support. Install it with: pip install pyxdf"
        ) from e

    filepath = str(filepath)
    data, header = pyxdf.load_xdf(filepath)

    if not data:
        raise ValueError(f"No streams found in XDF file: {filepath}")

    # Filter streams based on selection criteria
    selected_streams = self._select_streams(data, stream_names, stream_types, stream_ids)

    if not selected_streams:
        # If no criteria specified, select all streams with numeric data
        if stream_names is None and stream_types is None and stream_ids is None:
            selected_streams = [
                s
                for s in data
                if isinstance(s["time_series"], np.ndarray)
                and s["time_series"].dtype.kind in "iufc"
            ]
        if not selected_streams:
            raise ValueError(
                "No matching streams found. Use summarize_xdf() to explore the file."
            )

    # Create EMG object
    emg = EMG()

    # Store metadata
    emg.set_metadata("source_file", filepath)
    emg.set_metadata("device", "XDF")
    emg.set_metadata("stream_count", len(selected_streams))

    if sync_streams and len(selected_streams) > 1:
        self._load_synchronized_streams(
            emg, selected_streams, default_channel_type, include_timestamps, reference_stream
        )
    else:
        # Load streams (uses highest sample rate as reference unless specified)
        self._load_streams(
            emg, selected_streams, default_channel_type, include_timestamps, reference_stream
        )

    return emg

XDFStreamInfo dataclass

Information about a single XDF stream.

Source code in emgio/importers/xdf.py
@dataclass
class XDFStreamInfo:
    """Information about a single XDF stream."""

    stream_id: int
    name: str
    stream_type: str
    channel_count: int
    nominal_srate: float
    effective_srate: float | None
    channel_format: str
    source_id: str
    hostname: str
    sample_count: int
    duration_seconds: float
    channel_labels: list[str]
    channel_types: list[str]
    channel_units: list[str]

    def __str__(self) -> str:
        """Human-readable string representation."""
        lines = [
            f"Stream {self.stream_id}: {self.name}",
            f"  Type: {self.stream_type}",
            f"  Channels: {self.channel_count}",
            f"  Nominal srate: {self.nominal_srate} Hz",
        ]
        if self.effective_srate:
            lines.append(f"  Effective srate: {self.effective_srate:.2f} Hz")
        lines.extend(
            [
                f"  Samples: {self.sample_count}",
                f"  Duration: {self.duration_seconds:.2f} s",
                f"  Format: {self.channel_format}",
            ]
        )
        if self.channel_labels:
            labels_preview = ", ".join(self.channel_labels[:5])
            if len(self.channel_labels) > 5:
                labels_preview += f", ... (+{len(self.channel_labels) - 5} more)"
            lines.append(f"  Channel labels: {labels_preview}")
        return "\n".join(lines)

__str__()

Human-readable string representation.

Source code in emgio/importers/xdf.py
def __str__(self) -> str:
    """Human-readable string representation."""
    lines = [
        f"Stream {self.stream_id}: {self.name}",
        f"  Type: {self.stream_type}",
        f"  Channels: {self.channel_count}",
        f"  Nominal srate: {self.nominal_srate} Hz",
    ]
    if self.effective_srate:
        lines.append(f"  Effective srate: {self.effective_srate:.2f} Hz")
    lines.extend(
        [
            f"  Samples: {self.sample_count}",
            f"  Duration: {self.duration_seconds:.2f} s",
            f"  Format: {self.channel_format}",
        ]
    )
    if self.channel_labels:
        labels_preview = ", ".join(self.channel_labels[:5])
        if len(self.channel_labels) > 5:
            labels_preview += f", ... (+{len(self.channel_labels) - 5} more)"
        lines.append(f"  Channel labels: {labels_preview}")
    return "\n".join(lines)

XDFSummary dataclass

Summary of an XDF file's contents.

Source code in emgio/importers/xdf.py
@dataclass
class XDFSummary:
    """Summary of an XDF file's contents."""

    filepath: str
    streams: list[XDFStreamInfo]
    header_info: dict[str, Any]

    def __str__(self) -> str:
        """Human-readable string representation."""
        lines = [
            f"XDF File: {self.filepath}",
            f"Number of streams: {len(self.streams)}",
            "",
        ]
        for stream in self.streams:
            lines.append(str(stream))
            lines.append("")
        return "\n".join(lines)

    def get_streams_by_type(self, stream_type: str) -> list[XDFStreamInfo]:
        """Get all streams of a specific type (case-insensitive)."""
        return [s for s in self.streams if s.stream_type.upper() == stream_type.upper()]

    def get_stream_by_name(self, name: str) -> XDFStreamInfo | None:
        """Get a stream by name (case-insensitive)."""
        for stream in self.streams:
            if stream.name.lower() == name.lower():
                return stream
        return None

    def get_stream_by_id(self, stream_id: int) -> XDFStreamInfo | None:
        """Get a stream by its ID."""
        for stream in self.streams:
            if stream.stream_id == stream_id:
                return stream
        return None

__str__()

Human-readable string representation.

Source code in emgio/importers/xdf.py
def __str__(self) -> str:
    """Human-readable string representation."""
    lines = [
        f"XDF File: {self.filepath}",
        f"Number of streams: {len(self.streams)}",
        "",
    ]
    for stream in self.streams:
        lines.append(str(stream))
        lines.append("")
    return "\n".join(lines)

get_stream_by_id(stream_id)

Get a stream by its ID.

Source code in emgio/importers/xdf.py
def get_stream_by_id(self, stream_id: int) -> XDFStreamInfo | None:
    """Get a stream by its ID."""
    for stream in self.streams:
        if stream.stream_id == stream_id:
            return stream
    return None

get_stream_by_name(name)

Get a stream by name (case-insensitive).

Source code in emgio/importers/xdf.py
def get_stream_by_name(self, name: str) -> XDFStreamInfo | None:
    """Get a stream by name (case-insensitive)."""
    for stream in self.streams:
        if stream.name.lower() == name.lower():
            return stream
    return None

get_streams_by_type(stream_type)

Get all streams of a specific type (case-insensitive).

Source code in emgio/importers/xdf.py
def get_streams_by_type(self, stream_type: str) -> list[XDFStreamInfo]:
    """Get all streams of a specific type (case-insensitive)."""
    return [s for s in self.streams if s.stream_type.upper() == stream_type.upper()]

_determine_channel_type_from_label(label)

Determine channel type based on label naming conventions.

Source code in emgio/importers/xdf.py
def _determine_channel_type_from_label(label: str) -> str:
    """Determine channel type based on label naming conventions."""
    label_upper = label.upper()

    if "EMG" in label_upper or "MUS" in label_upper:
        return "EMG"
    elif "ACC" in label_upper:
        return "ACC"
    elif "GYRO" in label_upper:
        return "GYRO"
    elif "EEG" in label_upper or label_upper in [
        "FP1",
        "FP2",
        "F3",
        "F4",
        "C3",
        "C4",
        "P3",
        "P4",
        "O1",
        "O2",
        "F7",
        "F8",
        "T3",
        "T4",
        "T5",
        "T6",
        "FZ",
        "CZ",
        "PZ",
        "OZ",
    ]:
        return "EEG"
    elif "ECG" in label_upper or "EKG" in label_upper:
        return "ECG"
    elif "EOG" in label_upper:
        return "EOG"
    elif "TRIG" in label_upper or "MARKER" in label_upper or "EVENT" in label_upper:
        return "TRIG"

    return ""

summarize_xdf(filepath)

Summarize the contents of an XDF file without fully loading all data.

This function loads the XDF file and extracts metadata about all streams, including channel names, types, sampling rates, and data shapes. Use this to explore an XDF file before deciding which streams to import.

Args: filepath: Path to the XDF file

Returns: XDFSummary: Object containing information about all streams in the file

Example: >>> summary = summarize_xdf("recording.xdf") >>> print(summary) >>> # Find EMG streams >>> emg_streams = summary.get_streams_by_type("EMG")

Source code in emgio/importers/xdf.py
def summarize_xdf(filepath: str | Path) -> XDFSummary:
    """
    Summarize the contents of an XDF file without fully loading all data.

    This function loads the XDF file and extracts metadata about all streams,
    including channel names, types, sampling rates, and data shapes. Use this
    to explore an XDF file before deciding which streams to import.

    Args:
        filepath: Path to the XDF file

    Returns:
        XDFSummary: Object containing information about all streams in the file

    Example:
        >>> summary = summarize_xdf("recording.xdf")
        >>> print(summary)
        >>> # Find EMG streams
        >>> emg_streams = summary.get_streams_by_type("EMG")
    """
    try:
        import pyxdf
    except ImportError as e:
        raise ImportError(
            "pyxdf is required for XDF file support. Install it with: pip install pyxdf"
        ) from e

    filepath = str(filepath)
    data, header = pyxdf.load_xdf(filepath)

    streams = []
    for stream in data:
        info = stream["info"]

        # Extract basic info
        # Note: stream_id may be missing in some XDF files; use -1 as sentinel
        stream_id = info.get("stream_id", -1)
        name = info["name"][0] if "name" in info else "Unknown"
        stream_type = info["type"][0] if "type" in info else "Unknown"
        channel_count = int(info["channel_count"][0]) if "channel_count" in info else 0
        nominal_srate = float(info["nominal_srate"][0]) if "nominal_srate" in info else 0.0
        effective_srate = stream.get("effective_srate")
        channel_format = info["channel_format"][0] if "channel_format" in info else "unknown"
        source_id = info["source_id"][0] if "source_id" in info else ""
        hostname = info["hostname"][0] if "hostname" in info else ""

        # Get data shape info - handle both numpy arrays and lists (marker streams)
        time_series = stream["time_series"]
        if isinstance(time_series, np.ndarray):
            sample_count = time_series.shape[0] if time_series.ndim > 0 else 0
        elif isinstance(time_series, list):
            sample_count = len(time_series)
        else:
            sample_count = 0

        # Calculate duration
        # For streams with 2+ timestamps, use actual time range
        # For single-sample or no-timestamp streams, estimate from sample rate or default to 0
        timestamps = stream.get("time_stamps", np.array([]))
        if len(timestamps) > 1:
            duration_seconds = timestamps[-1] - timestamps[0]
        elif effective_srate and effective_srate > 0 and sample_count > 0:
            duration_seconds = sample_count / effective_srate
        else:
            duration_seconds = 0.0

        # Extract channel info from desc
        channel_labels = []
        channel_types = []
        channel_units = []

        if "desc" in info and info["desc"] and info["desc"][0]:
            desc = info["desc"][0]
            if isinstance(desc, dict) and "channels" in desc and desc["channels"]:
                channels_info = desc["channels"][0]
                if isinstance(channels_info, dict) and "channel" in channels_info:
                    for ch in channels_info["channel"]:
                        if isinstance(ch, dict):
                            label = ch.get("label", [""])[0] if "label" in ch else ""
                            ch_type = ch.get("type", [""])[0] if "type" in ch else ""
                            unit = ch.get("unit", [""])[0] if "unit" in ch else ""
                            channel_labels.append(label)
                            channel_types.append(ch_type)
                            channel_units.append(unit)

        # If no channel info in desc, create default labels
        if not channel_labels:
            channel_labels = [f"Ch{i + 1}" for i in range(channel_count)]
            channel_types = [""] * channel_count
            channel_units = [""] * channel_count

        stream_info = XDFStreamInfo(
            stream_id=stream_id,
            name=name,
            stream_type=stream_type,
            channel_count=channel_count,
            nominal_srate=nominal_srate,
            effective_srate=effective_srate,
            channel_format=channel_format,
            source_id=source_id,
            hostname=hostname,
            sample_count=sample_count,
            duration_seconds=duration_seconds,
            channel_labels=channel_labels,
            channel_types=channel_types,
            channel_units=channel_units,
        )
        streams.append(stream_info)

    header_info = dict(header.get("info", {})) if header else {}

    return XDFSummary(filepath=filepath, streams=streams, header_info=header_info)

Usage Examples

Basic Loading

from emgio import EMG
from emgio.importers.xdf import XDFImporter

# Method 1: Using EMG.from_file (recommended)
emg = EMG.from_file('recording.xdf')

# Method 2: Using the importer directly
importer = XDFImporter()
emg = importer.load('recording.xdf')

Exploring File Contents

Before loading, explore what streams are available:

from emgio.importers.xdf import summarize_xdf

summary = summarize_xdf('recording.xdf')
print(summary)

# Output example:
# XDF File: recording.xdf
# ----------------------------------------
# Stream 1: MyEEG (EEG)
#   Channels: 8, Rate: 256.0 Hz
#   Samples: 15360, Duration: 60.0s
# Stream 2: MyEMG (EMG)
#   Channels: 2, Rate: 2048.0 Hz
#   Samples: 122880, Duration: 60.0s
# Stream 3: Markers (Markers)
#   Channels: 1, Rate: 0.0 Hz (irregular)
#   Samples: 10

Selective Stream Loading

# Load only specific stream types
emg = EMG.from_file('recording.xdf', stream_types=['EMG'])

# Load multiple types
emg = EMG.from_file('recording.xdf', stream_types=['EMG', 'EEG'])

# Load by stream name
emg = EMG.from_file('recording.xdf', stream_names=['MyEMGDevice'])

# Load by stream ID
emg = EMG.from_file('recording.xdf', stream_ids=[2])

Setting Default Channel Type

# For streams without explicit channel type metadata
emg = EMG.from_file('recording.xdf', default_channel_type='EMG')

Preserving LSL Timestamps

# Include original LSL timestamps as additional channels
emg = EMG.from_file('recording.xdf', include_timestamps=True)

# Each stream gets a "{stream_name}_LSL_timestamps" channel
# Useful for synchronization with other LSL-recorded data

File Format Support

The XDF importer supports:

  1. Single-stream and multi-stream XDF files
  2. Compressed XDF files (.xdfz)
  3. Numeric data types: float32, float64, int8, int16, int32, int64
  4. Different sampling rates across streams (with resampling)
  5. Channel labels from stream descriptors

Stream Selection Parameters

Parameter Type Description
stream_names list[str] Filter by stream names (case-insensitive)
stream_types list[str] Filter by stream types (e.g., "EMG", "EEG")
stream_ids list[int] Filter by stream IDs
default_channel_type str Default type for channels without explicit type
include_timestamps bool If True, add LSL timestamp channels for each stream

Return Values

The load() method returns an EMG object with:

Signals (pandas.DataFrame)

  • Time-indexed signal data
  • Channels as columns
  • Resampled to common time base if multiple streams

Channels (dict)

For each channel: - channel_type: Inferred or default type - physical_dimension: Unit (default "a.u.") - sample_frequency: Effective sampling rate - stream_name: Original stream name - stream_id: Original stream ID

Metadata (dict)

  • device: "XDF"
  • source_file: Path to the XDF file
  • stream_count: Number of streams in file
  • stream_names: List of all stream names
  • stream_types: List of all stream types

Helper Classes

XDFSummary

Provides an overview of the XDF file:

summary = summarize_xdf('recording.xdf')

# Access all streams
for stream in summary.streams:
    print(f"{stream.name}: {stream.channel_count} channels")

# Find streams by type
emg_streams = summary.get_streams_by_type('EMG')

# Find stream by name
stream = summary.get_stream_by_name('MyDevice')

XDFStreamInfo

Contains metadata for a single stream:

  • stream_id: Unique stream identifier
  • name: Stream name
  • stream_type: Stream type (EEG, EMG, etc.)
  • channel_count: Number of channels
  • nominal_srate: Declared sampling rate
  • effective_srate: Actual measured sampling rate
  • channel_format: Data format (float32, string, etc.)
  • source_id: Source identifier
  • hostname: Recording machine hostname
  • sample_count: Number of samples
  • duration_seconds: Recording duration
  • channel_labels: List of channel names

Implementation Notes

  1. String/Marker Streams: Streams with channel_format='string' are excluded from signal loading but appear in summaries.

  2. Time Alignment: When loading multiple streams, timestamps are aligned to start at 0.

  3. Resampling: Multiple streams with different rates are resampled using linear interpolation to the highest rate.

  4. Channel Naming: Channels are prefixed with stream name to avoid conflicts (e.g., "StreamName_ChannelLabel").