Skip to content

CSV Importer

The CSVImporter class is responsible for importing EMG and other physiological data from generic CSV files with flexible format detection and configuration options.

Class Documentation

emgio.importers.csv

BaseImporter

Bases: ABC

Base class for EMG data importers.

Source code in emgio/importers/base.py
class BaseImporter(ABC):
    """Base class for EMG data importers."""

    @abstractmethod
    def load(self, filepath: str) -> EMG:
        """
        Load EMG data from file.

        Args:
            filepath: Path to the input file

        Returns:
            EMG: EMG object containing the loaded data
        """
        pass

load(filepath) abstractmethod

Load EMG data from file.

Args: filepath: Path to the input file

Returns: EMG: EMG object containing the loaded data

Source code in emgio/importers/base.py
@abstractmethod
def load(self, filepath: str) -> EMG:
    """
    Load EMG data from file.

    Args:
        filepath: Path to the input file

    Returns:
        EMG: EMG object containing the loaded data
    """
    pass

CSVImporter

Bases: BaseImporter

General purpose CSV importer for EMG data.

This importer can handle various CSV formats with columnar data, auto-detect headers, time columns, and allow for specific column selection.

Source code in emgio/importers/csv.py
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
class CSVImporter(BaseImporter):
    """
    General purpose CSV importer for EMG data.

    This importer can handle various CSV formats with columnar data, auto-detect
    headers, time columns, and allow for specific column selection.
    """

    def _detect_specialized_format(self, filepath: str) -> Optional[str]:
        """
        Detect if the file matches a known specialized format.

        Args:
            filepath: Path to the CSV file

        Returns:
            Name of the detected specialized format, or None if no specific format is detected
        """
        # Try to read the first few lines to check for format signatures
        try:
            with open(filepath, 'r') as f:
                header_lines = [f.readline().strip() for _ in range(20)]
                header_text = '\n'.join(header_lines)

                # Check for Trigno format signatures
                if any(marker in header_text for marker in ['Trigno', 'Delsys', 'Label:', 'X[s]']):
                    return 'trigno'

                # Additional format checks can be added here for other importers
                # For example:
                # if 'OTB' in header_text or 'Sessantaquattro' in header_text:
                #     return 'otb'

        except Exception:
            # If we can't read the file or encounter an error,
            # don't try to guess the format
            pass

        return None

    def load(self, filepath: str, force_generic: bool = False, **kwargs) -> EMG:
        """
        Load EMG data from a CSV file.

        Args:
            filepath: Path to the CSV file
            force_generic: If True, forces using the generic CSV importer even if a
                          specialized format is detected
            **kwargs: Additional options including:
                - columns: List of column names or indices to include
                - time_column: Name or index of column to use as time index (default: auto-detect)
                - has_header: Whether file has a header row (default: auto-detect)
                - skiprows: Number of rows to skip at the beginning (default: auto-detect)
                - delimiter: Column delimiter (default: auto-detect)
                - sample_frequency: Sampling frequency in Hz (required if no time column)
                - channel_types: Dict mapping column names to channel types ('EMG', 'ACC', etc.)
                - physical_dimensions: Dict mapping column names to physical dimensions
                - metadata: Dict of additional metadata to include

        Returns:
            EMG: EMG object containing the loaded data

        Raises:
            ValueError: If a specialized format is detected and force_generic is False
            FileNotFoundError: If the file does not exist
        """
        # Check if this file matches a specialized format
        if not force_generic:
            format_name = self._detect_specialized_format(filepath)
            if format_name:
                importer_messages = {
                    'trigno': (
                        "This file appears to be a Delsys Trigno CSV export. "
                        "For better metadata extraction and channel detection, use:\n\n"
                        "emg = EMG.from_file(filepath, importer='trigno')\n\n"
                        "If you still want to use the generic CSV importer, set force_generic=True:\n"
                        "importer = CSVImporter()\n"
                        "emg = importer.load(filepath, force_generic=True, **params)"
                    )
                    # Add more format-specific messages here as new importers are developed
                }

                if format_name in importer_messages:
                    raise ValueError(importer_messages[format_name])

        # Extract kwargs with defaults
        columns = kwargs.get('columns', None)
        time_column = kwargs.get('time_column', None)
        has_header = kwargs.get('has_header', None)
        skiprows = kwargs.get('skiprows', None)
        delimiter = kwargs.get('delimiter', None)
        sample_frequency = kwargs.get('sample_frequency', None)
        channel_names = kwargs.get('channel_names', [])
        channel_types = kwargs.get('channel_types', {})
        physical_dimensions = kwargs.get('physical_dimensions', {})
        metadata = kwargs.get('metadata', {})

        # Analyze file structure if parameters not explicitly provided
        try:
            if any(param is None for param in [has_header, skiprows, delimiter]):
                analyzed_params = self._analyze_csv_structure(filepath)

                # Use analyzed parameters for any not explicitly provided
                has_header = has_header if has_header is not None else analyzed_params['has_header']
                skiprows = skiprows if skiprows is not None else analyzed_params['skiprows']
                delimiter = delimiter if delimiter is not None else analyzed_params['delimiter']
        except FileNotFoundError:
            # Pass through file not found errors
            raise

        # Read the CSV file
        try:
            df = pd.read_csv(
                filepath,
                header=0 if has_header else None,
                skiprows=skiprows,
                delimiter=delimiter,
                index_col=None
            )
        except FileNotFoundError:
            # Pass through file not found errors
            raise
        except Exception as e:
            raise ValueError(f"Failed to read CSV file: {str(e)}")

        # If no header, generate column names
        if not has_header:
            df.columns = [f"Channel_{i}" for i in range(len(df.columns))]

        # If channel names are provided, use them.
        # Also, handle the case where the length of channel_names is less than the number of columns.
        if channel_names:
            if len(channel_names) < len(df.columns):
                raise ValueError(
                    "Number of channel names provided is less than the number of columns in the CSV file."
                )
            df.columns = channel_names

        # Filter columns if specified
        if columns is not None:
            if all(isinstance(col, int) for col in columns):
                # Convert numerical indices to column names
                col_names = [df.columns[i] for i in columns]
                # Save original columns for potential renumbering
                df = df[col_names]

                # If using default channel names, renumber them sequentially
                if not has_header and not channel_names:
                    # Check if these are auto-generated channel names
                    if all(col.startswith('Channel_') for col in col_names):
                        # Rename columns to be sequential
                        new_names = [f"Channel_{i}" for i in range(len(col_names))]
                        rename_map = {old: new for old, new in zip(col_names, new_names)}
                        df = df.rename(columns=rename_map)
            else:
                # Filter by column names
                df = df[columns]

        # Handle time column
        if time_column is not None:
            # If time_column is an index, convert to column name
            if isinstance(time_column, int):
                time_column = df.columns[time_column]

            # Set time column as index
            if time_column in df.columns:
                df.set_index(time_column, inplace=True)
            else:
                raise ValueError(f"Time column '{time_column}' not found in data")
        elif has_header:
            # When header exists, try to auto-detect time column only if has_header is True
            time_col = self._detect_time_column(df)
            if time_col:
                df.set_index(time_col, inplace=True)
            elif sample_frequency:
                # Create time index based on provided sampling frequency
                time_index = np.arange(len(df)) / sample_frequency
                df.index = time_index
            else:
                # No time column and no sample frequency provided
                raise ValueError(
                    "No time column detected and no sample_frequency provided. "
                    "Please specify either time_column or sample_frequency."
                )
        else:
            # For headerless data, don't attempt to auto-detect time column
            # to avoid treating the first column as time
            if sample_frequency:
                # Create time index based on provided sampling frequency
                time_index = np.arange(len(df)) / sample_frequency
                df.index = time_index
            else:
                # No sample frequency provided
                raise ValueError(
                    "No sample_frequency provided for headerless data. "
                    "Please specify sample_frequency for proper time indexing."
                )

        # Create EMG object
        emg = EMG()

        # Add metadata
        emg.set_metadata('source_file', filepath)
        emg.set_metadata('file_format', 'CSV')

        # Add any user-provided metadata
        for key, value in metadata.items():
            emg.set_metadata(key, value)

        # Default sampling frequency if not specified
        default_sample_frequency = 1000.0  # 1 kHz is a common default for EMG
        if hasattr(df.index, 'to_series'):
            # Calculate sampling frequency from time index if possible
            try:
                time_diffs = df.index.to_series().diff().dropna()
                if len(time_diffs) > 0:
                    avg_diff = time_diffs.mean()
                    if avg_diff > 0:
                        calculated_freq = 1.0 / avg_diff
                        default_sample_frequency = calculated_freq
            except Exception:
                # If calculation fails, keep default
                pass

        # Add each column as a channel
        for column in df.columns:
            # Determine channel type
            if column in channel_types:
                ch_type = channel_types[column]
            else:
                # Try to infer channel type from name
                ch_type = self._infer_channel_type(column)

            # Determine physical dimension
            if column in physical_dimensions:
                phys_dim = physical_dimensions[column]
            else:
                # Default based on channel type
                phys_dim = self._default_physical_dimension(ch_type)

            # Add the channel to the EMG object
            emg.add_channel(
                label=column,
                data=df[column].values,
                sample_frequency=sample_frequency or default_sample_frequency,
                physical_dimension=phys_dim,
                channel_type=ch_type
            )

        # Encourage user to add metadata if missing essential information
        self._print_metadata_reminder(emg)

        return emg

    def _analyze_csv_structure(self, filepath: str) -> Dict:
        """
        Analyze the CSV file structure to detect delimiter, headers, and rows to skip.

        Args:
            filepath: Path to the CSV file

        Returns:
            Dict with detected parameters:
                - delimiter: Detected delimiter character
                - has_header: Whether the file has a header row
                - skiprows: Number of rows to skip
        """
        # Default results
        results = {
            'delimiter': ',',
            'has_header': True,
            'skiprows': 0
        }

        try:
            # Read the first few lines to analyze structure
            with open(filepath, 'r') as f:
                lines = [f.readline().strip() for _ in range(30)]  # Read first 30 lines or until EOF
                lines = [line for line in lines if line]  # Remove empty lines

                # Special case for Trigno CSV format
                data_start = 0
                for i, line in enumerate(lines):
                    if 'X[s]' in line:
                        data_start = i
                        results['skiprows'] = data_start
                        results['has_header'] = True
                        break

                if data_start > 0:
                    # Found a header line with X[s], use the line after it as data
                    return results

                # If not a special format, continue with regular analysis
                # Count occurrences of each delimiter and choose the most common one
                delimiters = {',': 0, '\t': 0, ';': 0, '|': 0}

                for line in lines[:5]:  # Check first 5 lines
                    if not line or line.startswith('#'):
                        continue

                    for delim in delimiters:
                        if delim in line:
                            # Count occurrences but also consider how many fields it creates
                            fields = line.split(delim)
                            if len(fields) > 1:  # Must create at least 2 fields to be valid
                                delimiters[delim] += len(fields)

                # Choose the delimiter that creates the most fields
                if any(delimiters.values()):
                    most_common = max(delimiters.items(), key=lambda x: x[1])
                    results['delimiter'] = most_common[0]

                # Infer if file has a header by checking if first row looks different from data rows
                if len(lines) >= 2:
                    possible_header = lines[0]
                    possible_data = lines[1]

                    # If first row contains alphabetic characters and data rows are numeric
                    header_values = possible_header.split(results['delimiter'])
                    data_values = possible_data.split(results['delimiter'])

                    # Check for alpha chars in header
                    has_alpha = any(any(c.isalpha() for c in val) for val in header_values if val.strip())
                    # Check if data rows are numeric
                    numeric_data = all(self._is_numeric(val) for val in data_values if val.strip())

                    if has_alpha and numeric_data:
                        results['has_header'] = True
                    else:
                        # If no clear distinction, assume no header if all fields look numeric
                        results['has_header'] = not all(self._is_numeric(val) for val in header_values if val.strip())

        except Exception:
            # If analysis fails, return defaults
            pass

        return results

    def _is_numeric(self, value: str) -> bool:
        """Check if a string value is numeric."""
        try:
            float(value)
            return True
        except ValueError:
            return False

    def _detect_time_column(self, df: pd.DataFrame) -> Optional[str]:
        """
        Try to detect which column represents time.

        Args:
            df: DataFrame with loaded data

        Returns:
            Name of detected time column or None if not found
        """
        time_keywords = ['time', 'second', 'seconds', 's']

        # Check column names for time keywords
        for col in df.columns:
            col_lower = col.lower()
            if any(keyword in col_lower for keyword in time_keywords):
                return col

        # Check if first column is monotonically increasing (typical for time)
        first_col = df.columns[0]
        if len(df) > 1 and pd.Series(df[first_col]).is_monotonic_increasing:
            # Check if the values are plausible time values (e.g., not all integers if diff is small)
            if df[first_col].dtype in [np.float64, np.float32]:
                return first_col
            elif df[first_col].diff().dropna().mean() > 1e-9:  # Avoid treating integer indices as time
                return first_col

        return None

    def _infer_channel_type(self, column_name: str) -> str:
        """
        Infer channel type from column name.

        Args:
            column_name: Name of the column

        Returns:
            Inferred channel type
        """
        name_lower = column_name.lower()

        if any(keyword in name_lower for keyword in ['emg', 'muscle']):
            return 'EMG'
        elif any(keyword in name_lower for keyword in ['acc', 'accel']):
            return 'ACC'
        elif any(keyword in name_lower for keyword in ['gyro']):
            return 'GYRO'
        elif any(keyword in name_lower for keyword in ['time', 'second']):
            return 'TIME'  # Might be redundant if used as index, but useful for metadata
        else:
            return 'OTHER'

    def _default_physical_dimension(self, channel_type: str) -> str:
        """
        Return default physical dimension for a channel type.

        Args:
            channel_type: Type of channel

        Returns:
            Default physical dimension
        """
        dimensions = {
            'EMG': 'µV',
            'ACC': 'g',
            'GYRO': 'deg/s',
            'TIME': 's',
            'OTHER': 'a.u.'
        }
        return dimensions.get(channel_type, 'a.u.')

    def _print_metadata_reminder(self, emg: EMG) -> None:
        """
        Print a reminder to add metadata if essential information is missing.

        Args:
            emg: EMG object to check
        """
        essential_metadata = ['subject', 'device', 'recording_date']
        missing = [meta for meta in essential_metadata if meta not in emg.metadata]

        if missing:
            print("[INFO] Reminder: Consider adding essential metadata for better context:")
            for meta in missing:
                print(f"  emg.set_metadata('{meta}', '<Your {meta.replace('_', ' ').title()}>')")
            print("Example: emg.set_metadata('subject', 'S001')")

load(filepath, force_generic=False, **kwargs)

Load EMG data from a CSV file.

Args: filepath: Path to the CSV file force_generic: If True, forces using the generic CSV importer even if a specialized format is detected **kwargs: Additional options including: - columns: List of column names or indices to include - time_column: Name or index of column to use as time index (default: auto-detect) - has_header: Whether file has a header row (default: auto-detect) - skiprows: Number of rows to skip at the beginning (default: auto-detect) - delimiter: Column delimiter (default: auto-detect) - sample_frequency: Sampling frequency in Hz (required if no time column) - channel_types: Dict mapping column names to channel types ('EMG', 'ACC', etc.) - physical_dimensions: Dict mapping column names to physical dimensions - metadata: Dict of additional metadata to include

Returns: EMG: EMG object containing the loaded data

Raises: ValueError: If a specialized format is detected and force_generic is False FileNotFoundError: If the file does not exist

Source code in emgio/importers/csv.py
def load(self, filepath: str, force_generic: bool = False, **kwargs) -> EMG:
    """
    Load EMG data from a CSV file.

    Args:
        filepath: Path to the CSV file
        force_generic: If True, forces using the generic CSV importer even if a
                      specialized format is detected
        **kwargs: Additional options including:
            - columns: List of column names or indices to include
            - time_column: Name or index of column to use as time index (default: auto-detect)
            - has_header: Whether file has a header row (default: auto-detect)
            - skiprows: Number of rows to skip at the beginning (default: auto-detect)
            - delimiter: Column delimiter (default: auto-detect)
            - sample_frequency: Sampling frequency in Hz (required if no time column)
            - channel_types: Dict mapping column names to channel types ('EMG', 'ACC', etc.)
            - physical_dimensions: Dict mapping column names to physical dimensions
            - metadata: Dict of additional metadata to include

    Returns:
        EMG: EMG object containing the loaded data

    Raises:
        ValueError: If a specialized format is detected and force_generic is False
        FileNotFoundError: If the file does not exist
    """
    # Check if this file matches a specialized format
    if not force_generic:
        format_name = self._detect_specialized_format(filepath)
        if format_name:
            importer_messages = {
                'trigno': (
                    "This file appears to be a Delsys Trigno CSV export. "
                    "For better metadata extraction and channel detection, use:\n\n"
                    "emg = EMG.from_file(filepath, importer='trigno')\n\n"
                    "If you still want to use the generic CSV importer, set force_generic=True:\n"
                    "importer = CSVImporter()\n"
                    "emg = importer.load(filepath, force_generic=True, **params)"
                )
                # Add more format-specific messages here as new importers are developed
            }

            if format_name in importer_messages:
                raise ValueError(importer_messages[format_name])

    # Extract kwargs with defaults
    columns = kwargs.get('columns', None)
    time_column = kwargs.get('time_column', None)
    has_header = kwargs.get('has_header', None)
    skiprows = kwargs.get('skiprows', None)
    delimiter = kwargs.get('delimiter', None)
    sample_frequency = kwargs.get('sample_frequency', None)
    channel_names = kwargs.get('channel_names', [])
    channel_types = kwargs.get('channel_types', {})
    physical_dimensions = kwargs.get('physical_dimensions', {})
    metadata = kwargs.get('metadata', {})

    # Analyze file structure if parameters not explicitly provided
    try:
        if any(param is None for param in [has_header, skiprows, delimiter]):
            analyzed_params = self._analyze_csv_structure(filepath)

            # Use analyzed parameters for any not explicitly provided
            has_header = has_header if has_header is not None else analyzed_params['has_header']
            skiprows = skiprows if skiprows is not None else analyzed_params['skiprows']
            delimiter = delimiter if delimiter is not None else analyzed_params['delimiter']
    except FileNotFoundError:
        # Pass through file not found errors
        raise

    # Read the CSV file
    try:
        df = pd.read_csv(
            filepath,
            header=0 if has_header else None,
            skiprows=skiprows,
            delimiter=delimiter,
            index_col=None
        )
    except FileNotFoundError:
        # Pass through file not found errors
        raise
    except Exception as e:
        raise ValueError(f"Failed to read CSV file: {str(e)}")

    # If no header, generate column names
    if not has_header:
        df.columns = [f"Channel_{i}" for i in range(len(df.columns))]

    # If channel names are provided, use them.
    # Also, handle the case where the length of channel_names is less than the number of columns.
    if channel_names:
        if len(channel_names) < len(df.columns):
            raise ValueError(
                "Number of channel names provided is less than the number of columns in the CSV file."
            )
        df.columns = channel_names

    # Filter columns if specified
    if columns is not None:
        if all(isinstance(col, int) for col in columns):
            # Convert numerical indices to column names
            col_names = [df.columns[i] for i in columns]
            # Save original columns for potential renumbering
            df = df[col_names]

            # If using default channel names, renumber them sequentially
            if not has_header and not channel_names:
                # Check if these are auto-generated channel names
                if all(col.startswith('Channel_') for col in col_names):
                    # Rename columns to be sequential
                    new_names = [f"Channel_{i}" for i in range(len(col_names))]
                    rename_map = {old: new for old, new in zip(col_names, new_names)}
                    df = df.rename(columns=rename_map)
        else:
            # Filter by column names
            df = df[columns]

    # Handle time column
    if time_column is not None:
        # If time_column is an index, convert to column name
        if isinstance(time_column, int):
            time_column = df.columns[time_column]

        # Set time column as index
        if time_column in df.columns:
            df.set_index(time_column, inplace=True)
        else:
            raise ValueError(f"Time column '{time_column}' not found in data")
    elif has_header:
        # When header exists, try to auto-detect time column only if has_header is True
        time_col = self._detect_time_column(df)
        if time_col:
            df.set_index(time_col, inplace=True)
        elif sample_frequency:
            # Create time index based on provided sampling frequency
            time_index = np.arange(len(df)) / sample_frequency
            df.index = time_index
        else:
            # No time column and no sample frequency provided
            raise ValueError(
                "No time column detected and no sample_frequency provided. "
                "Please specify either time_column or sample_frequency."
            )
    else:
        # For headerless data, don't attempt to auto-detect time column
        # to avoid treating the first column as time
        if sample_frequency:
            # Create time index based on provided sampling frequency
            time_index = np.arange(len(df)) / sample_frequency
            df.index = time_index
        else:
            # No sample frequency provided
            raise ValueError(
                "No sample_frequency provided for headerless data. "
                "Please specify sample_frequency for proper time indexing."
            )

    # Create EMG object
    emg = EMG()

    # Add metadata
    emg.set_metadata('source_file', filepath)
    emg.set_metadata('file_format', 'CSV')

    # Add any user-provided metadata
    for key, value in metadata.items():
        emg.set_metadata(key, value)

    # Default sampling frequency if not specified
    default_sample_frequency = 1000.0  # 1 kHz is a common default for EMG
    if hasattr(df.index, 'to_series'):
        # Calculate sampling frequency from time index if possible
        try:
            time_diffs = df.index.to_series().diff().dropna()
            if len(time_diffs) > 0:
                avg_diff = time_diffs.mean()
                if avg_diff > 0:
                    calculated_freq = 1.0 / avg_diff
                    default_sample_frequency = calculated_freq
        except Exception:
            # If calculation fails, keep default
            pass

    # Add each column as a channel
    for column in df.columns:
        # Determine channel type
        if column in channel_types:
            ch_type = channel_types[column]
        else:
            # Try to infer channel type from name
            ch_type = self._infer_channel_type(column)

        # Determine physical dimension
        if column in physical_dimensions:
            phys_dim = physical_dimensions[column]
        else:
            # Default based on channel type
            phys_dim = self._default_physical_dimension(ch_type)

        # Add the channel to the EMG object
        emg.add_channel(
            label=column,
            data=df[column].values,
            sample_frequency=sample_frequency or default_sample_frequency,
            physical_dimension=phys_dim,
            channel_type=ch_type
        )

    # Encourage user to add metadata if missing essential information
    self._print_metadata_reminder(emg)

    return emg

EMG

Core EMG class for handling EMG data and metadata.

Attributes: signals (pd.DataFrame): Raw signal data with time as index. metadata (dict): Metadata dictionary containing recording information. channels (dict): Channel information including type, unit, sampling frequency. events (pd.DataFrame): Annotations or events associated with the signals, with columns 'onset', 'duration', 'description'.

Source code in emgio/core/emg.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
class EMG:
    """
    Core EMG class for handling EMG data and metadata.

    Attributes:
        signals (pd.DataFrame): Raw signal data with time as index.
        metadata (dict): Metadata dictionary containing recording information.
        channels (dict): Channel information including type, unit, sampling frequency.
        events (pd.DataFrame): Annotations or events associated with the signals,
                               with columns 'onset', 'duration', 'description'.
    """

    def __init__(self):
        """Initialize an empty EMG object."""
        self.signals = None
        self.metadata = {}
        self.channels = {}
        # Initialize events as an empty DataFrame with specified columns
        self.events = pd.DataFrame(columns=['onset', 'duration', 'description'])

    def plot_signals(self, channels=None, time_range=None, offset_scale=0.8,
                    uniform_scale=True, detrend=False, grid=True, title=None,
                    show=True, plt_module=None):
        """
        Plot EMG signals in a single plot with vertical offsets.

        Args:
            channels: List of channels to plot. If None, plot all channels.
            time_range: Tuple of (start_time, end_time) to plot. If None, plot all data.
            offset_scale: Portion of allocated space each signal can use (0.0 to 1.0).
            uniform_scale: Whether to use the same scale for all signals.
            detrend: Whether to remove mean from signals before plotting.
            grid: Whether to show grid lines.
            title: Optional title for the figure.
            show: Whether to display the plot.
            plt_module: Matplotlib pyplot module to use.
        """
        # Delegate to the static plotting function in visualization module
        static_plot_signals(
            emg_object=self,
            channels=channels,
            time_range=time_range,
            offset_scale=offset_scale,
            uniform_scale=uniform_scale,
            detrend=detrend,
            grid=grid,
            title=title,
            show=show,
            plt_module=plt_module
        )

    @classmethod
    def _infer_importer(cls, filepath: str) -> str:
        """
        Infer the importer to use based on the file extension.
        """
        extension = os.path.splitext(filepath)[1].lower()
        if extension in {'.edf', '.bdf'}:
            return 'edf'
        elif extension in {'.set'}:
            return 'eeglab'
        elif extension in {'.otb', '.otb+'}:
            return 'otb'
        elif extension in {'.csv', '.txt'}:
            return 'csv'
        elif extension in {'.hea', '.dat', '.atr'}:
            return 'wfdb'
        else:
            raise ValueError(f"Unsupported file extension: {extension}")

    @classmethod
    def from_file(
            cls,
            filepath: str,
            importer: Literal['trigno', 'otb', 'eeglab', 'edf', 'csv', 'wfdb'] | None = None,
            force_csv: bool = False,
            **kwargs
    ) -> 'EMG':
        """
        The method to create EMG object from file.

        Args:
            filepath: Path to the input file
            importer: Name of the importer to use. Can be one of the following:
                - 'trigno': Delsys Trigno EMG system (CSV)
                - 'otb': OTB/OTB+ EMG system (OTB, OTB+)
                - 'eeglab': EEGLAB .set files (SET)
                - 'edf': EDF/EDF+/BDF/BDF+ format (EDF, BDF)
                - 'csv': Generic CSV (or TXT) files with columnar data
                - 'wfdb': Waveform Database (WFDB)
                If None, the importer will be inferred from the file extension.
                Automatic import is supported for CSV/TXT files.
            force_csv: If True and importer is 'csv', forces using the generic CSV
                      importer even if the file appears to match a specialized format.
            **kwargs: Additional arguments passed to the importer

        Returns:
            EMG: New EMG object with loaded data
        """
        if importer is None:
            importer = cls._infer_importer(filepath)

        importers = {
            'trigno': 'TrignoImporter',  # CSV with Delsys Trigno Headers
            'otb': 'OTBImporter',  # OTB/OTB+ EMG system data
            'edf': 'EDFImporter',  # EDF/EDF+/BDF format
            'eeglab': 'EEGLABImporter',  # EEGLAB .set files
            'csv': 'CSVImporter',  # Generic CSV/Text files
            'wfdb': 'WFDBImporter'  # Waveform Database format
        }

        if importer not in importers:
            raise ValueError(
                f"Unsupported importer: {importer}. "
                f"Available importers: {list(importers.keys())}\n"
                "- trigno: Delsys Trigno EMG system\n"
                "- otb: OTB/OTB+ EMG system\n"
                "- edf: EDF/EDF+/BDF format\n"
                "- eeglab: EEGLAB .set files\n"
                "- csv: Generic CSV/Text files\n"
                "- wfdb: Waveform Database"
            )

        # If using CSV importer and force_csv is set, pass it as force_generic
        if importer == 'csv':
            kwargs['force_generic'] = force_csv

        # Import the appropriate importer class
        importer_module = __import__(
            f'emgio.importers.{importer}',
            globals(),
            locals(),
            [importers[importer]]
        )
        importer_class = getattr(importer_module, importers[importer])

        # Create importer instance and load data
        return importer_class().load(filepath, **kwargs)

    def select_channels(
            self,
            channels: Union[str, List[str], None] = None,
            channel_type: Optional[str] = None,
            inplace: bool = False) -> 'EMG':
        """
        Select specific channels from the data and return a new EMG object.

        Args:
            channels: Channel name or list of channel names to select. If None and
                    channel_type is specified, selects all channels of that type.
            channel_type: Type of channels to select ('EMG', 'ACC', 'GYRO', etc.).
                        If specified with channels, filters the selection to only
                        channels of this type.

        Returns:
            EMG: A new EMG object containing only the selected channels

        Examples:
            # Select specific channels
            new_emg = emg.select_channels(['EMG1', 'ACC1'])

            # Select all EMG channels
            emg_only = emg.select_channels(channel_type='EMG')

            # Select specific EMG channels only, this example does not select ACC channels
            emg_subset = emg.select_channels(['EMG1', 'ACC1'], channel_type='EMG')
        """
        if self.signals is None:
            raise ValueError("No signals loaded")

        # If channel_type specified but no channels, select all of that type
        if channels is None and channel_type is not None:
            channels = [ch for ch, info in self.channels.items()
                        if info['channel_type'] == channel_type]
            if not channels:
                raise ValueError(f"No channels found of type: {channel_type}")
        elif isinstance(channels, str):
            channels = [channels]

        # Validate channels exist
        if not all(ch in self.signals.columns for ch in channels):
            missing = [ch for ch in channels if ch not in self.signals.columns]
            raise ValueError(f"Channels not found: {missing}")

        # Filter by type if specified
        if channel_type is not None:
            channels = [ch for ch in channels
                        if self.channels[ch]['channel_type'] == channel_type]
            if not channels:
                raise ValueError(
                    f"None of the selected channels are of type: {channel_type}")

        # Create new EMG object
        new_emg = EMG()

        # Copy selected signals and channels
        new_emg.signals = self.signals[channels].copy()
        new_emg.channels = {ch: self.channels[ch].copy() for ch in channels}

        # Copy metadata
        new_emg.metadata = self.metadata.copy()

        if not inplace:
            return new_emg
        else:
            self.signals = new_emg.signals
            self.channels = new_emg.channels
            self.metadata = new_emg.metadata
            return self

    def get_channel_types(self) -> List[str]:
        """
        Get list of unique channel types in the data.

        Returns:
            List of channel types (e.g., ['EMG', 'ACC', 'GYRO'])
        """
        return list(set(info['channel_type'] for info in self.channels.values()))

    def get_channels_by_type(self, channel_type: str) -> List[str]:
        """
        Get list of channels of a specific type.

        Args:
            channel_type: Type of channels to get ('EMG', 'ACC', 'GYRO', etc.)

        Returns:
            List of channel names of the specified type
        """
        return [ch for ch, info in self.channels.items()
                if info['channel_type'] == channel_type]

    def to_edf(self, filepath: str, method: str = 'both',
               fft_noise_range: tuple = None, svd_rank: int = None,
               precision_threshold: float = 0.01,
               format: Literal['auto', 'edf', 'bdf'] = 'auto',
               bypass_analysis: bool | None = None,
               verify: bool = False, verify_tolerance: float = 1e-6,
               verify_channel_map: Optional[Dict[str, str]] = None,
               verify_plot: bool = False,
               events_df: Optional[pd.DataFrame] = None,
               **kwargs
               ) -> Union[str, None]:
        """
        Export EMG data to EDF/BDF format, optionally including events.

        Args:
            filepath: Path to save the EDF/BDF file
            method: Method for signal analysis ('svd', 'fft', or 'both')
                'svd': Uses Singular Value Decomposition for noise floor estimation
                'fft': Uses Fast Fourier Transform for noise floor estimation
                'both': Uses both methods and takes the minimum noise floor (default)
            fft_noise_range: Optional tuple (min_freq, max_freq) specifying frequency range for noise in FFT method
            svd_rank: Optional manual rank cutoff for signal/noise separation in SVD method
            precision_threshold: Maximum acceptable precision loss percentage (default: 0.01%)
            format: Format to use ('auto', 'edf', or 'bdf'). Default is 'auto'.
                    If 'edf' or 'bdf' is specified, that format will be used directly.
                    If 'auto', the format (EDF/16-bit or BDF/24-bit) is chosen based
                    on signal analysis to minimize precision loss while preferring EDF
                    if sufficient.
            bypass_analysis: If True, skip signal analysis step when format is explicitly
                             set to 'edf' or 'bdf'. If None (default), analysis is skipped
                             automatically when format is forced. Set to False to force
                             analysis even with a specified format. Ignored if format='auto'.
            verify: If True, reload the exported file and compare signals with the original
                    to check for data integrity loss. Results are printed. (default: False)
            verify_tolerance: Absolute tolerance used when comparing signals during verification. (default: 1e-6)
            verify_channel_map: Optional dictionary mapping original channel names (keys)
                                to reloaded channel names (values) for verification.
                                Used if `verify` is True and channel names might differ.
            verify_plot: If True and verify is True, plots a comparison of original vs reloaded signals.
            events_df: Optional DataFrame with events ('onset', 'duration', 'description').
                      If None, uses self.events. (This provides flexibility)
            **kwargs: Additional arguments for the EDF exporter

        Returns:
            Union[str, None]: If verify is True, returns a string with verification results.
                             Otherwise, returns None.

        Raises:
            ValueError: If no signals are loaded
        """
        from ..exporters.edf import EDFExporter  # Local import

        if self.signals is None:
            raise ValueError("No signals loaded")

        # --- Determine if analysis should be bypassed ---
        final_bypass_analysis = False
        if format.lower() == 'auto':
            if bypass_analysis is True:
                logging.warning("bypass_analysis=True ignored because format='auto'. Analysis is required.")
            # Analysis is always needed for 'auto' format
            final_bypass_analysis = False
        elif format.lower() in ['edf', 'bdf']:
            if bypass_analysis is None:
                # Default behaviour: skip analysis if format is forced
                final_bypass_analysis = True
                msg = (f"Format forced to '{format}'. Skipping signal analysis for faster export. "
                       "Set bypass_analysis=False to force analysis.")
                logging.log(logging.CRITICAL, msg)
            elif bypass_analysis is True:
                final_bypass_analysis = True
                logging.log(logging.CRITICAL, "bypass_analysis=True set. Skipping signal analysis.")
            else:  # bypass_analysis is False
                final_bypass_analysis = False
                logging.info(f"Format forced to '{format}' but bypass_analysis=False. Performing signal analysis.")
        else:
            # Should not happen if Literal type hint works, but good practice
            logging.warning(f"Unknown format '{format}'. Defaulting to 'auto' behavior (analysis enabled).")
            format = 'auto'
            final_bypass_analysis = False

        # Determine which events DataFrame to use
        if events_df is None:
            events_to_export = self.events
        else:
            events_to_export = events_df

        # Combine parameters
        all_params = {
            'precision_threshold': precision_threshold,
            'method': method,
            'fft_noise_range': fft_noise_range,
            'svd_rank': svd_rank,
            'format': format,
            'bypass_analysis': final_bypass_analysis,
            'events_df': events_to_export,  # Pass the events dataframe
            **kwargs
        }

        EDFExporter.export(self, filepath, **all_params)

        verification_report_dict = None
        if verify:
            logging.info(f"Verification requested. Reloading exported file: {filepath}")
            try:
                # Reload the exported file
                reloaded_emg = EMG.from_file(filepath, importer='edf')

                logging.info("Comparing original signals with reloaded signals...")
                # Compare signals using the imported function
                verification_results = compare_signals(
                    self,
                    reloaded_emg,
                    tolerance=verify_tolerance,
                    channel_map=verify_channel_map
                )

                # Generate and log report using the imported function
                report_verification_results(verification_results, verify_tolerance)
                verification_report_dict = verification_results

                # Plot comparison using imported function if requested
                summary = verification_results.get('channel_summary', {})
                comparison_mode = summary.get('comparison_mode', 'unknown')
                compared_count = sum(1 for k in verification_results if k != 'channel_summary')

                if verify_plot and compared_count > 0 and comparison_mode != 'failed':
                    plot_comparison(self, reloaded_emg, channel_map=verify_channel_map)
                elif verify_plot:
                    logging.warning("Skipping verification plot: No channels were successfully compared.")

            except Exception as e:
                logging.error(f"Verification failed during reload or comparison: {e}")
                verification_report_dict = {
                    'error': str(e),
                    'channel_summary': {'comparison_mode': 'failed'}
                }

        return verification_report_dict

    def set_metadata(self, key: str, value: any) -> None:
        """
        Set metadata value.

        Args:
            key: Metadata key
            value: Metadata value
        """
        self.metadata[key] = value

    def get_metadata(self, key: str) -> any:
        """
        Get metadata value.

        Args:
            key: Metadata key

        Returns:
            Value associated with the key
        """
        return self.metadata.get(key)

    def add_channel(
            self, label: str, data: np.ndarray, sample_frequency: float,
            physical_dimension: str, prefilter: str = 'n/a', channel_type: str = 'EMG') -> None:
        """
        Add a new channel to the EMG data.

        Args:
            label: Channel label or name (as per EDF specification)
            data: Channel data
            sample_frequency: Sampling frequency in Hz (as per EDF specification)
            physical_dimension: Physical dimension/unit of measurement (as per EDF specification)
            prefilter: Pre-filtering applied to the channel
            channel_type: Channel type ('EMG', 'ACC', 'GYRO', etc.)
        """
        if self.signals is None:
            # Create DataFrame with time index
            time = np.arange(len(data)) / sample_frequency
            self.signals = pd.DataFrame(index=time)

        self.signals[label] = data
        self.channels[label] = {
            'sample_frequency': sample_frequency,
            'physical_dimension': physical_dimension,
            'prefilter': prefilter,
            'channel_type': channel_type
        }

    def add_event(self, onset: float, duration: float, description: str) -> None:
        """
        Add an event/annotation to the EMG object.

        Args:
            onset: Event onset time in seconds.
            duration: Event duration in seconds.
            description: Event description string.
        """
        new_event = pd.DataFrame([{'onset': onset, 'duration': duration, 'description': description}])
        # Use pd.concat for appending, ignore_index=True resets the index
        self.events = pd.concat([self.events, new_event], ignore_index=True)
        # Sort events by onset time for consistency
        self.events.sort_values(by='onset', inplace=True)
        self.events.reset_index(drop=True, inplace=True)

__init__()

Source code in emgio/core/emg.py
def __init__(self):
    """Initialize an empty EMG object."""
    self.signals = None
    self.metadata = {}
    self.channels = {}
    # Initialize events as an empty DataFrame with specified columns
    self.events = pd.DataFrame(columns=['onset', 'duration', 'description'])

add_channel(label, data, sample_frequency, physical_dimension, prefilter='n/a', channel_type='EMG')

Add a new channel to the EMG data.

Args: label: Channel label or name (as per EDF specification) data: Channel data sample_frequency: Sampling frequency in Hz (as per EDF specification) physical_dimension: Physical dimension/unit of measurement (as per EDF specification) prefilter: Pre-filtering applied to the channel channel_type: Channel type ('EMG', 'ACC', 'GYRO', etc.)

Source code in emgio/core/emg.py
def add_channel(
        self, label: str, data: np.ndarray, sample_frequency: float,
        physical_dimension: str, prefilter: str = 'n/a', channel_type: str = 'EMG') -> None:
    """
    Add a new channel to the EMG data.

    Args:
        label: Channel label or name (as per EDF specification)
        data: Channel data
        sample_frequency: Sampling frequency in Hz (as per EDF specification)
        physical_dimension: Physical dimension/unit of measurement (as per EDF specification)
        prefilter: Pre-filtering applied to the channel
        channel_type: Channel type ('EMG', 'ACC', 'GYRO', etc.)
    """
    if self.signals is None:
        # Create DataFrame with time index
        time = np.arange(len(data)) / sample_frequency
        self.signals = pd.DataFrame(index=time)

    self.signals[label] = data
    self.channels[label] = {
        'sample_frequency': sample_frequency,
        'physical_dimension': physical_dimension,
        'prefilter': prefilter,
        'channel_type': channel_type
    }

add_event(onset, duration, description)

Add an event/annotation to the EMG object.

Args: onset: Event onset time in seconds. duration: Event duration in seconds. description: Event description string.

Source code in emgio/core/emg.py
def add_event(self, onset: float, duration: float, description: str) -> None:
    """
    Add an event/annotation to the EMG object.

    Args:
        onset: Event onset time in seconds.
        duration: Event duration in seconds.
        description: Event description string.
    """
    new_event = pd.DataFrame([{'onset': onset, 'duration': duration, 'description': description}])
    # Use pd.concat for appending, ignore_index=True resets the index
    self.events = pd.concat([self.events, new_event], ignore_index=True)
    # Sort events by onset time for consistency
    self.events.sort_values(by='onset', inplace=True)
    self.events.reset_index(drop=True, inplace=True)

from_file(filepath, importer=None, force_csv=False, **kwargs) classmethod

The method to create EMG object from file.

Args: filepath: Path to the input file importer: Name of the importer to use. Can be one of the following: - 'trigno': Delsys Trigno EMG system (CSV) - 'otb': OTB/OTB+ EMG system (OTB, OTB+) - 'eeglab': EEGLAB .set files (SET) - 'edf': EDF/EDF+/BDF/BDF+ format (EDF, BDF) - 'csv': Generic CSV (or TXT) files with columnar data - 'wfdb': Waveform Database (WFDB) If None, the importer will be inferred from the file extension. Automatic import is supported for CSV/TXT files. force_csv: If True and importer is 'csv', forces using the generic CSV importer even if the file appears to match a specialized format. **kwargs: Additional arguments passed to the importer

Returns: EMG: New EMG object with loaded data

Source code in emgio/core/emg.py
@classmethod
def from_file(
        cls,
        filepath: str,
        importer: Literal['trigno', 'otb', 'eeglab', 'edf', 'csv', 'wfdb'] | None = None,
        force_csv: bool = False,
        **kwargs
) -> 'EMG':
    """
    The method to create EMG object from file.

    Args:
        filepath: Path to the input file
        importer: Name of the importer to use. Can be one of the following:
            - 'trigno': Delsys Trigno EMG system (CSV)
            - 'otb': OTB/OTB+ EMG system (OTB, OTB+)
            - 'eeglab': EEGLAB .set files (SET)
            - 'edf': EDF/EDF+/BDF/BDF+ format (EDF, BDF)
            - 'csv': Generic CSV (or TXT) files with columnar data
            - 'wfdb': Waveform Database (WFDB)
            If None, the importer will be inferred from the file extension.
            Automatic import is supported for CSV/TXT files.
        force_csv: If True and importer is 'csv', forces using the generic CSV
                  importer even if the file appears to match a specialized format.
        **kwargs: Additional arguments passed to the importer

    Returns:
        EMG: New EMG object with loaded data
    """
    if importer is None:
        importer = cls._infer_importer(filepath)

    importers = {
        'trigno': 'TrignoImporter',  # CSV with Delsys Trigno Headers
        'otb': 'OTBImporter',  # OTB/OTB+ EMG system data
        'edf': 'EDFImporter',  # EDF/EDF+/BDF format
        'eeglab': 'EEGLABImporter',  # EEGLAB .set files
        'csv': 'CSVImporter',  # Generic CSV/Text files
        'wfdb': 'WFDBImporter'  # Waveform Database format
    }

    if importer not in importers:
        raise ValueError(
            f"Unsupported importer: {importer}. "
            f"Available importers: {list(importers.keys())}\n"
            "- trigno: Delsys Trigno EMG system\n"
            "- otb: OTB/OTB+ EMG system\n"
            "- edf: EDF/EDF+/BDF format\n"
            "- eeglab: EEGLAB .set files\n"
            "- csv: Generic CSV/Text files\n"
            "- wfdb: Waveform Database"
        )

    # If using CSV importer and force_csv is set, pass it as force_generic
    if importer == 'csv':
        kwargs['force_generic'] = force_csv

    # Import the appropriate importer class
    importer_module = __import__(
        f'emgio.importers.{importer}',
        globals(),
        locals(),
        [importers[importer]]
    )
    importer_class = getattr(importer_module, importers[importer])

    # Create importer instance and load data
    return importer_class().load(filepath, **kwargs)

get_channel_types()

Get list of unique channel types in the data.

Returns: List of channel types (e.g., ['EMG', 'ACC', 'GYRO'])

Source code in emgio/core/emg.py
def get_channel_types(self) -> List[str]:
    """
    Get list of unique channel types in the data.

    Returns:
        List of channel types (e.g., ['EMG', 'ACC', 'GYRO'])
    """
    return list(set(info['channel_type'] for info in self.channels.values()))

get_channels_by_type(channel_type)

Get list of channels of a specific type.

Args: channel_type: Type of channels to get ('EMG', 'ACC', 'GYRO', etc.)

Returns: List of channel names of the specified type

Source code in emgio/core/emg.py
def get_channels_by_type(self, channel_type: str) -> List[str]:
    """
    Get list of channels of a specific type.

    Args:
        channel_type: Type of channels to get ('EMG', 'ACC', 'GYRO', etc.)

    Returns:
        List of channel names of the specified type
    """
    return [ch for ch, info in self.channels.items()
            if info['channel_type'] == channel_type]

get_metadata(key)

Get metadata value.

Args: key: Metadata key

Returns: Value associated with the key

Source code in emgio/core/emg.py
def get_metadata(self, key: str) -> any:
    """
    Get metadata value.

    Args:
        key: Metadata key

    Returns:
        Value associated with the key
    """
    return self.metadata.get(key)

plot_signals(channels=None, time_range=None, offset_scale=0.8, uniform_scale=True, detrend=False, grid=True, title=None, show=True, plt_module=None)

Plot EMG signals in a single plot with vertical offsets.

Args: channels: List of channels to plot. If None, plot all channels. time_range: Tuple of (start_time, end_time) to plot. If None, plot all data. offset_scale: Portion of allocated space each signal can use (0.0 to 1.0). uniform_scale: Whether to use the same scale for all signals. detrend: Whether to remove mean from signals before plotting. grid: Whether to show grid lines. title: Optional title for the figure. show: Whether to display the plot. plt_module: Matplotlib pyplot module to use.

Source code in emgio/core/emg.py
def plot_signals(self, channels=None, time_range=None, offset_scale=0.8,
                uniform_scale=True, detrend=False, grid=True, title=None,
                show=True, plt_module=None):
    """
    Plot EMG signals in a single plot with vertical offsets.

    Args:
        channels: List of channels to plot. If None, plot all channels.
        time_range: Tuple of (start_time, end_time) to plot. If None, plot all data.
        offset_scale: Portion of allocated space each signal can use (0.0 to 1.0).
        uniform_scale: Whether to use the same scale for all signals.
        detrend: Whether to remove mean from signals before plotting.
        grid: Whether to show grid lines.
        title: Optional title for the figure.
        show: Whether to display the plot.
        plt_module: Matplotlib pyplot module to use.
    """
    # Delegate to the static plotting function in visualization module
    static_plot_signals(
        emg_object=self,
        channels=channels,
        time_range=time_range,
        offset_scale=offset_scale,
        uniform_scale=uniform_scale,
        detrend=detrend,
        grid=grid,
        title=title,
        show=show,
        plt_module=plt_module
    )

select_channels(channels=None, channel_type=None, inplace=False)

Select specific channels from the data and return a new EMG object.

Args: channels: Channel name or list of channel names to select. If None and channel_type is specified, selects all channels of that type. channel_type: Type of channels to select ('EMG', 'ACC', 'GYRO', etc.). If specified with channels, filters the selection to only channels of this type.

Returns: EMG: A new EMG object containing only the selected channels

Examples: # Select specific channels new_emg = emg.select_channels(['EMG1', 'ACC1'])

# Select all EMG channels
emg_only = emg.select_channels(channel_type='EMG')

# Select specific EMG channels only, this example does not select ACC channels
emg_subset = emg.select_channels(['EMG1', 'ACC1'], channel_type='EMG')
Source code in emgio/core/emg.py
def select_channels(
        self,
        channels: Union[str, List[str], None] = None,
        channel_type: Optional[str] = None,
        inplace: bool = False) -> 'EMG':
    """
    Select specific channels from the data and return a new EMG object.

    Args:
        channels: Channel name or list of channel names to select. If None and
                channel_type is specified, selects all channels of that type.
        channel_type: Type of channels to select ('EMG', 'ACC', 'GYRO', etc.).
                    If specified with channels, filters the selection to only
                    channels of this type.

    Returns:
        EMG: A new EMG object containing only the selected channels

    Examples:
        # Select specific channels
        new_emg = emg.select_channels(['EMG1', 'ACC1'])

        # Select all EMG channels
        emg_only = emg.select_channels(channel_type='EMG')

        # Select specific EMG channels only, this example does not select ACC channels
        emg_subset = emg.select_channels(['EMG1', 'ACC1'], channel_type='EMG')
    """
    if self.signals is None:
        raise ValueError("No signals loaded")

    # If channel_type specified but no channels, select all of that type
    if channels is None and channel_type is not None:
        channels = [ch for ch, info in self.channels.items()
                    if info['channel_type'] == channel_type]
        if not channels:
            raise ValueError(f"No channels found of type: {channel_type}")
    elif isinstance(channels, str):
        channels = [channels]

    # Validate channels exist
    if not all(ch in self.signals.columns for ch in channels):
        missing = [ch for ch in channels if ch not in self.signals.columns]
        raise ValueError(f"Channels not found: {missing}")

    # Filter by type if specified
    if channel_type is not None:
        channels = [ch for ch in channels
                    if self.channels[ch]['channel_type'] == channel_type]
        if not channels:
            raise ValueError(
                f"None of the selected channels are of type: {channel_type}")

    # Create new EMG object
    new_emg = EMG()

    # Copy selected signals and channels
    new_emg.signals = self.signals[channels].copy()
    new_emg.channels = {ch: self.channels[ch].copy() for ch in channels}

    # Copy metadata
    new_emg.metadata = self.metadata.copy()

    if not inplace:
        return new_emg
    else:
        self.signals = new_emg.signals
        self.channels = new_emg.channels
        self.metadata = new_emg.metadata
        return self

set_metadata(key, value)

Set metadata value.

Args: key: Metadata key value: Metadata value

Source code in emgio/core/emg.py
def set_metadata(self, key: str, value: any) -> None:
    """
    Set metadata value.

    Args:
        key: Metadata key
        value: Metadata value
    """
    self.metadata[key] = value

to_edf(filepath, method='both', fft_noise_range=None, svd_rank=None, precision_threshold=0.01, format='auto', bypass_analysis=None, verify=False, verify_tolerance=1e-06, verify_channel_map=None, verify_plot=False, events_df=None, **kwargs)

Export EMG data to EDF/BDF format, optionally including events.

Args: filepath: Path to save the EDF/BDF file method: Method for signal analysis ('svd', 'fft', or 'both') 'svd': Uses Singular Value Decomposition for noise floor estimation 'fft': Uses Fast Fourier Transform for noise floor estimation 'both': Uses both methods and takes the minimum noise floor (default) fft_noise_range: Optional tuple (min_freq, max_freq) specifying frequency range for noise in FFT method svd_rank: Optional manual rank cutoff for signal/noise separation in SVD method precision_threshold: Maximum acceptable precision loss percentage (default: 0.01%) format: Format to use ('auto', 'edf', or 'bdf'). Default is 'auto'. If 'edf' or 'bdf' is specified, that format will be used directly. If 'auto', the format (EDF/16-bit or BDF/24-bit) is chosen based on signal analysis to minimize precision loss while preferring EDF if sufficient. bypass_analysis: If True, skip signal analysis step when format is explicitly set to 'edf' or 'bdf'. If None (default), analysis is skipped automatically when format is forced. Set to False to force analysis even with a specified format. Ignored if format='auto'. verify: If True, reload the exported file and compare signals with the original to check for data integrity loss. Results are printed. (default: False) verify_tolerance: Absolute tolerance used when comparing signals during verification. (default: 1e-6) verify_channel_map: Optional dictionary mapping original channel names (keys) to reloaded channel names (values) for verification. Used if verify is True and channel names might differ. verify_plot: If True and verify is True, plots a comparison of original vs reloaded signals. events_df: Optional DataFrame with events ('onset', 'duration', 'description'). If None, uses self.events. (This provides flexibility) **kwargs: Additional arguments for the EDF exporter

Returns: Union[str, None]: If verify is True, returns a string with verification results. Otherwise, returns None.

Raises: ValueError: If no signals are loaded

Source code in emgio/core/emg.py
def to_edf(self, filepath: str, method: str = 'both',
           fft_noise_range: tuple = None, svd_rank: int = None,
           precision_threshold: float = 0.01,
           format: Literal['auto', 'edf', 'bdf'] = 'auto',
           bypass_analysis: bool | None = None,
           verify: bool = False, verify_tolerance: float = 1e-6,
           verify_channel_map: Optional[Dict[str, str]] = None,
           verify_plot: bool = False,
           events_df: Optional[pd.DataFrame] = None,
           **kwargs
           ) -> Union[str, None]:
    """
    Export EMG data to EDF/BDF format, optionally including events.

    Args:
        filepath: Path to save the EDF/BDF file
        method: Method for signal analysis ('svd', 'fft', or 'both')
            'svd': Uses Singular Value Decomposition for noise floor estimation
            'fft': Uses Fast Fourier Transform for noise floor estimation
            'both': Uses both methods and takes the minimum noise floor (default)
        fft_noise_range: Optional tuple (min_freq, max_freq) specifying frequency range for noise in FFT method
        svd_rank: Optional manual rank cutoff for signal/noise separation in SVD method
        precision_threshold: Maximum acceptable precision loss percentage (default: 0.01%)
        format: Format to use ('auto', 'edf', or 'bdf'). Default is 'auto'.
                If 'edf' or 'bdf' is specified, that format will be used directly.
                If 'auto', the format (EDF/16-bit or BDF/24-bit) is chosen based
                on signal analysis to minimize precision loss while preferring EDF
                if sufficient.
        bypass_analysis: If True, skip signal analysis step when format is explicitly
                         set to 'edf' or 'bdf'. If None (default), analysis is skipped
                         automatically when format is forced. Set to False to force
                         analysis even with a specified format. Ignored if format='auto'.
        verify: If True, reload the exported file and compare signals with the original
                to check for data integrity loss. Results are printed. (default: False)
        verify_tolerance: Absolute tolerance used when comparing signals during verification. (default: 1e-6)
        verify_channel_map: Optional dictionary mapping original channel names (keys)
                            to reloaded channel names (values) for verification.
                            Used if `verify` is True and channel names might differ.
        verify_plot: If True and verify is True, plots a comparison of original vs reloaded signals.
        events_df: Optional DataFrame with events ('onset', 'duration', 'description').
                  If None, uses self.events. (This provides flexibility)
        **kwargs: Additional arguments for the EDF exporter

    Returns:
        Union[str, None]: If verify is True, returns a string with verification results.
                         Otherwise, returns None.

    Raises:
        ValueError: If no signals are loaded
    """
    from ..exporters.edf import EDFExporter  # Local import

    if self.signals is None:
        raise ValueError("No signals loaded")

    # --- Determine if analysis should be bypassed ---
    final_bypass_analysis = False
    if format.lower() == 'auto':
        if bypass_analysis is True:
            logging.warning("bypass_analysis=True ignored because format='auto'. Analysis is required.")
        # Analysis is always needed for 'auto' format
        final_bypass_analysis = False
    elif format.lower() in ['edf', 'bdf']:
        if bypass_analysis is None:
            # Default behaviour: skip analysis if format is forced
            final_bypass_analysis = True
            msg = (f"Format forced to '{format}'. Skipping signal analysis for faster export. "
                   "Set bypass_analysis=False to force analysis.")
            logging.log(logging.CRITICAL, msg)
        elif bypass_analysis is True:
            final_bypass_analysis = True
            logging.log(logging.CRITICAL, "bypass_analysis=True set. Skipping signal analysis.")
        else:  # bypass_analysis is False
            final_bypass_analysis = False
            logging.info(f"Format forced to '{format}' but bypass_analysis=False. Performing signal analysis.")
    else:
        # Should not happen if Literal type hint works, but good practice
        logging.warning(f"Unknown format '{format}'. Defaulting to 'auto' behavior (analysis enabled).")
        format = 'auto'
        final_bypass_analysis = False

    # Determine which events DataFrame to use
    if events_df is None:
        events_to_export = self.events
    else:
        events_to_export = events_df

    # Combine parameters
    all_params = {
        'precision_threshold': precision_threshold,
        'method': method,
        'fft_noise_range': fft_noise_range,
        'svd_rank': svd_rank,
        'format': format,
        'bypass_analysis': final_bypass_analysis,
        'events_df': events_to_export,  # Pass the events dataframe
        **kwargs
    }

    EDFExporter.export(self, filepath, **all_params)

    verification_report_dict = None
    if verify:
        logging.info(f"Verification requested. Reloading exported file: {filepath}")
        try:
            # Reload the exported file
            reloaded_emg = EMG.from_file(filepath, importer='edf')

            logging.info("Comparing original signals with reloaded signals...")
            # Compare signals using the imported function
            verification_results = compare_signals(
                self,
                reloaded_emg,
                tolerance=verify_tolerance,
                channel_map=verify_channel_map
            )

            # Generate and log report using the imported function
            report_verification_results(verification_results, verify_tolerance)
            verification_report_dict = verification_results

            # Plot comparison using imported function if requested
            summary = verification_results.get('channel_summary', {})
            comparison_mode = summary.get('comparison_mode', 'unknown')
            compared_count = sum(1 for k in verification_results if k != 'channel_summary')

            if verify_plot and compared_count > 0 and comparison_mode != 'failed':
                plot_comparison(self, reloaded_emg, channel_map=verify_channel_map)
            elif verify_plot:
                logging.warning("Skipping verification plot: No channels were successfully compared.")

        except Exception as e:
            logging.error(f"Verification failed during reload or comparison: {e}")
            verification_report_dict = {
                'error': str(e),
                'channel_summary': {'comparison_mode': 'failed'}
            }

    return verification_report_dict

Usage Example

from emgio import EMG
from emgio.importers.csv import CSVImporter

# Method 1: Using EMG.from_file (recommended)
emg = EMG.from_file('data.csv', importer='csv')

# Method 2: Using the importer directly
importer = CSVImporter('data.csv', has_header=True, delimiter=',')
signals, channels, metadata = importer.load()
emg = EMG(signals, channels, metadata)

Auto-Detection Features

The CSV importer includes several auto-detection capabilities:

  • Format Detection: Recognizes specialized formats like Trigno CSV files
  • Delimiter Detection: Identifies the most common delimiter (comma, tab, semicolon)
  • Header Detection: Determines if the first row is a header based on content
  • Time Column Detection: Looks for columns that might represent time

Parameters

  • file_path (str): Path to the CSV file
  • kwargs (dict): Additional keyword arguments
  • sample_frequency (float, optional): Sampling frequency in Hz (required if no time column)
  • has_header (bool, optional): Whether file has a header row (auto-detected if not specified)
  • skiprows (int, optional): Number of rows to skip at beginning (auto-detected if not specified)
  • delimiter (str, optional): Column delimiter (auto-detected if not specified)
  • time_column (str or int, optional): Name or index of column to use as time index (auto-detected if not specified)
  • columns (list, optional): List of column names or indices to include
  • channel_names (list, optional): Custom names for channels
  • channel_types (dict, optional): Dict mapping column names to channel types ('EMG', 'ACC', etc.)
  • physical_dimensions (dict, optional): Dict mapping column names to physical dimensions
  • metadata (dict, optional): Dict of additional metadata to include
  • force_csv (bool, optional): Force using generic CSV importer even if specialized format is detected

Return Values

The load() method returns a tuple of:

  1. signals (pandas.DataFrame): Signal data with channels as columns and time as index
  2. channels (dict): Dictionary of channel information including:
  3. channel_type: Type of channel (EMG, EEG, etc.)
  4. physical_dimension: Physical unit (e.g., 'mV', 'g')
  5. sample_frequency: Sampling rate in Hz

  6. metadata (dict): Dictionary containing metadata from the file and any additional provided metadata

Implementation Details

The CSV importer uses pandas to:

  1. Detect the format and structure of the CSV file
  2. Extract time information if available or generate a time index based on sample frequency
  3. Convert column data to appropriate formats
  4. Apply channel labeling and typing based on provided information
  5. Construct a pandas DataFrame with the signal data

Examples

Basic CSV with Headers

# Load CSV with automatic format detection
emg = EMG.from_file('data.csv', importer='csv')

Headerless CSV with Custom Names

# Load headerless CSV with custom channel names
emg = EMG.from_file('data.csv', importer='csv',
                   has_header=False,
                   sample_frequency=1000,  # Required since no time column
                   channel_names=['EMG_L', 'EMG_R', 'ACC_X'])

Setting Channel Types and Units

# Specify channel types and physical dimensions
emg = EMG.from_file('data.csv', importer='csv',
                   channel_types={
                       'EMG1': 'EMG',
                       'EMG2': 'EMG',
                       'ACC1': 'ACC'
                   },
                   physical_dimensions={
                       'EMG1': 'mV',
                       'EMG2': 'mV',
                       'ACC1': 'g'
                   })