Skip to content

Dataset Module

energy_gnome.dataset.CathodeDatabase

Bases: BaseDatabase

Source code in energy_gnome/dataset/cathodes.py
Python
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
class CathodeDatabase(BaseDatabase):
    def __init__(
        self, data_dir: Path = DATA_DIR, working_ion: str = "Li", battery_type: str = "insertion"
    ):
        """
        Initialize the CathodeDatabase with a root data directory and processing stage.

        Sets up the directory structure for storing data across different processing stages
        (`raw/`, `processed/`, `final/`) and initializes placeholders for database paths and data.

        Args:
            data_dir (Path, optional): Root directory path for storing data.
                                       Defaults to DATA_DIR from config.
            working_ion (str, optional): The working ion used in the dataset (e.g., 'Li').
                                         Defaults to "Li".
            battery_type (str, optional): The type of battery type (e.g., 'insertion', 'conversion').
                                          Defaults to "insertion".

        Raises:
            NotImplementedError: If the specified processing stage is not supported.
            ImmutableRawDataError: If attempting to set an unsupported processing stage.
        """
        super().__init__(data_dir=data_dir)
        self.working_ion = working_ion

        if battery_type == "insertion":
            self.battery_type = battery_type
        elif battery_type == "conversion":
            logger.error("`conversion` battery type is not yet implemented in Material Project.")
            raise NotImplementedError(
                "`conversion` battery type is not yet present in Material Project."
            )
        else:
            logger.error(
                f"Invalid battery type: {battery_type}. Must be 'insertion' or 'conversion'."
            )
            raise ValueError(
                "`battery_type` can be only `insertion` or `conversion` (not yet implemented)"
            )

        # Initialize directories, paths, and databases for each stage
        self.database_directories = {
            stage: self.data_dir / stage / "cathodes" / battery_type / working_ion
            for stage in self.processing_stages
        }
        for stage_dir in self.database_directories.values():
            stage_dir.mkdir(parents=True, exist_ok=True)

        self.database_paths = {
            stage: dir_path / "database.json"
            for stage, dir_path in self.database_directories.items()
        }

        self.databases = {stage: pd.DataFrame() for stage in self.processing_stages}
        self._battery_models = pd.DataFrame()

    def retrieve_remote(self, mute_progress_bars: bool = True) -> pd.DataFrame:
        """
        Retrieve models from the Material Project API.

        Wrapper method to call `retrieve_models`.

        Args:
            mute_progress_bars (bool, optional):
                If `True`, mutes the Material Project API progress bars.
                Defaults to `True`.

        Returns:
            pd.DataFrame: DataFrame containing the retrieved models.
        """
        return self.retrieve_models(mute_progress_bars=mute_progress_bars)

    def retrieve_models(self, mute_progress_bars: bool = True) -> pd.DataFrame:
        """
        Retrieve battery models from the Materials Project API.

        Connects to the Material Project API using MPRester, queries for materials
        based on the working ion and processing stage, and retrieves the specified fields.
        Cleans the data by removing entries with missing critical identifiers.

        Args:
            mute_progress_bars (bool, optional):
                If `True`, mutes the Material Project API progress bars.
                Defaults to `True`.

        Returns:
            pd.DataFrame: DataFrame containing the retrieved and cleaned models.

        Raises:
            Exception: If the API query fails.
        """
        mp_api_key = get_mp_api_key()
        logger.debug("MP querying for insertion battery models.")

        with MPRester(mp_api_key, mute_progress_bars=mute_progress_bars) as mpr:
            try:
                query = mpr.materials.insertion_electrodes.search(
                    working_ion=self.working_ion, fields=BAT_FIELDS
                )
                logger.info(
                    f"MP query successful, {len(query)} {self.working_ion}-ion batteries found."
                )
            except Exception as e:
                raise e
        logger.debug("Converting MP query results into DataFrame.")
        battery_models_database = convert_my_query_to_dataframe(
            query, mute_progress_bars=mute_progress_bars
        )

        # Fast cleaning
        logger.debug("Removing NaN")
        battery_models_database = battery_models_database.dropna(
            axis=0, how="any", subset=["id_charge", "id_discharge"]
        )
        battery_models_database = battery_models_database.dropna(axis=1, how="all")
        self._battery_models = battery_models_database
        logger.success(f"{self.working_ion}-ion batteries model retrieved successfully.")
        return self._battery_models

    def compare_databases(self, new_db: pd.DataFrame, stage: str) -> pd.DataFrame:
        """
        Compare two databases and identify new entry IDs.

        Args:
            new_db (pd.DataFrame): New database to compare.
            stage (str): Processing stage ("raw", "processed", "final").

        Returns:
            pd.DataFrame: Subset of `new_db` containing only new entry IDs.
        """
        old_db = self.load_database(stage=stage)
        if not old_db.empty:
            new_ids_set = set(new_db["battery_id"])
            old_ids_set = set(old_db["battery_id"])
            new_ids_only = new_ids_set - old_ids_set
            logger.debug(f"Found {len(new_ids_only)} new battery IDs in the new database.")
            return new_db[new_db["battery_id"].isin(new_ids_only)]
        else:
            logger.warning("Nothing to compare here...")
            return new_db

    def backup_and_changelog(
        self,
        old_db: pd.DataFrame,
        new_db: pd.DataFrame,
        differences: pd.Series,
        stage: str,
    ) -> None:
        """
        Backup the old database and update the changelog with identified differences.

        Creates a backup of the existing database and appends a changelog entry detailing
        the differences between the old and new databases. The changelog includes
        information such as entry identifiers, formulas, and last updated timestamps.

        Args:
            old_db (pd.DataFrame): The existing database before updates.
            new_db (pd.DataFrame): The new database containing updates.
            differences (pd.Series): Series of identifiers that are new or updated.
            stage (str): The processing stage ('raw', 'processed', 'final').
        """
        if stage not in self.processing_stages:
            logger.error(f"Invalid stage: {stage}. Must be one of {self.processing_stages}.")
            raise ValueError(f"stage must be one of {self.processing_stages}.")

        backup_path = self.database_directories[stage] / "old_database.json"
        try:
            old_db.to_json(backup_path)
            logger.debug(f"Old database backed up to {backup_path}")
        except Exception as e:
            logger.error(f"Failed to backup old database to {backup_path}: {e}")
            raise OSError(f"Failed to backup old database to {backup_path}: {e}") from e

        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        changelog_path = self.database_directories[stage] / "changelog.txt"
        changelog_entries = [
            f"= Change Log - {timestamp} ".ljust(70, "=") + "\n",
            "Difference old_database.json VS database.json\n",
            f"{'ID':<15}{'Formula':<30}{'Last Updated (MP)':<25}\n",
            "-" * 70 + "\n",
        ]
        # Tailoring respect father class
        for identifier in differences["battery_id"]:
            row = new_db.loc[new_db["battery_id"] == identifier]
            if not row.empty:
                formula = row["battery_formula"].values[0]
                last_updated = row["last_updated"].values[0]
            else:
                formula = "N/A"
                last_updated = "N/A"
            changelog_entries.append(f"{identifier:<15}{formula:<30}{last_updated:<20}\n")

        try:
            with open(changelog_path, "a") as file:
                file.writelines(changelog_entries)
            logger.debug(f"Changelog updated at {changelog_path} with {len(differences)} changes.")
        except Exception as e:
            logger.error(f"Failed to update changelog at {changelog_path}: {e}")
            raise OSError(f"Failed to update changelog at {changelog_path}: {e}") from e

    def compare_and_update(self, new_db: pd.DataFrame, stage: str) -> pd.DataFrame:
        """
        Compare and update the database with new entries.

        Identifies new entries and updates the database accordingly. Ensures that raw data
        remains immutable by preventing updates unless explicitly allowed.

        Args:
            new_db (pd.DataFrame): New database to compare.
            stage (str): Processing stage ("raw", "processed", "final").

        Returns:
            pd.DataFrame: Updated database containing new entries.

        Raises:
            ImmutableRawDataError: If attempting to modify immutable raw data.
        """
        old_db = self.load_database(stage=stage)
        db_diff = self.compare_databases(new_db, stage)
        if not db_diff.empty:
            logger.warning(f"The new database contains {len(db_diff)} new items.")

            if stage == "raw" and not self._update_raw:
                logger.error("Raw data must be treated as immutable!")
                logger.error(
                    "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
                )
                raise ImmutableRawDataError(
                    "Raw data must be treated as immutable!\n"
                    "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
                )
            else:
                if stage == "raw":
                    logger.info(
                        "Be careful you are changing the raw data which must be treated as immutable!"
                    )
                logger.info(
                    f"Updating the {stage} data and saving it in {self.database_paths[stage]}."
                )
                self.backup_and_changelog(
                    old_db,
                    new_db,
                    db_diff,
                    stage,
                )
                self.databases[stage] = new_db
                self.save_database(stage)
        else:
            logger.info("No new items found. No update required.")

    def retrieve_materials(
        self, stage: str, charge_state: str, mute_progress_bars: bool = True
    ) -> list[Any]:
        """
        Retrieve material structures from the Material Project API.

        Fetches material structures based on the processing stage and charge state.

        Args:
            stage (str): Processing stage ('raw', 'processed', 'final').
            charge_state (str): Cathode charge state ('charge', 'discharge').
            mute_progress_bars (bool, optional): Disable progress bar if True. Defaults to True.

        Returns:
            List[Any]: List of retrieved material objects.

        Raises:
            ValueError: If the charge_state is invalid.
            MissingData: If the required data is missing in the database.
        """
        if charge_state not in ["charge", "discharge"]:
            logger.error(f"Invalid charge_state: {charge_state}. Must be 'charge' or 'discharge'.")
            raise ValueError("charge_state must be 'charge' or 'discharge'.")

        material_ids = self.databases[stage][f"id_{charge_state}"].tolist()
        if not material_ids:
            logger.warning(
                f"No material IDs found for stage '{stage}' and charge_state '{charge_state}'."
            )
            raise MissingData(
                f"No material IDs found for stage '{stage}' and charge_state '{charge_state}'."
            )

        logger.debug(
            f"Retrieving materials for stage '{stage}' and charge_state '{charge_state}'."
        )
        query = get_material_by_id(
            material_ids,
            mute_progress_bars=mute_progress_bars,
        )
        return query

    def _add_materials_properties_columns(self, stage: str, charge_state: str) -> pd.DataFrame:
        """
        Add material properties columns to the database for a given cathode state.

        Args:
            stage (str): Processing stage ('raw', 'processed', 'final').
            charge_state (str): Cathode charge state ('charge', 'discharge').

        Returns:
            pd.DataFrame: Updated database with material properties columns.

        Raises:
            ImmutableRawDataError: If attempting to modify immutable raw data.
        """
        if charge_state not in ["charge", "discharge"]:
            logger.error(f"Invalid charge_state: {charge_state}. Must be 'charge' or 'discharge'.")
            raise ValueError("charge_state must be 'charge' or 'discharge'.")

        if stage == "raw" and not self._update_raw:
            logger.error("Raw data must be treated as immutable!")
            logger.error(
                "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
            )
            raise ImmutableRawDataError(
                "Raw data must be treated as immutable!\n"
                "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
            )
        else:
            if stage == "raw":
                logger.info(
                    "Be careful you are changing the raw data which must be treated as immutable!"
                )
            logger.debug(
                f"Adding material properties to {stage} data for cathode state: {charge_state}"
            )
            for property_name, dtype in MAT_PROPERTIES.items():
                column_name = f"{charge_state}_{property_name}"
                if column_name not in self.databases[stage].columns:
                    logger.debug(f"Adding missing column: {column_name} with dtype {dtype}")
                    self.databases[stage][column_name] = pd.Series(dtype=dtype)

    def add_material_properties(
        self,
        stage: str,
        materials_mp_query: list,
        charge_state: str,
        mute_progress_bars: bool = True,
    ) -> pd.DataFrame:
        """
        Add material properties to the database from Material Project query results.

        Saves CIF files for each material in the query and updates the database with file paths and properties.

        Args:
            stage (str): Processing stage ('raw', 'processed', 'final').
            materials_mp_query (List[Any]): List of material query results.
            charge_state (str): The state of the cathode ('charge' or 'discharge').
            mute_progress_bars (bool, optional): Disable progress bar if True. Defaults to True.

        Returns:
            pd.DataFrame: Updated database with material properties.

        Raises:
            ImmutableRawDataError: If attempting to modify immutable raw data.
            KeyError: If a material ID is not found in the database.
        """
        if stage == "raw" and not self._update_raw:
            logger.error("Raw data must be treated as immutable!")
            logger.error(
                "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
            )
            raise ImmutableRawDataError(
                "Raw data must be treated as immutable!\n"
                "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
            )
        else:
            if stage == "raw":
                logger.info(
                    "Be careful you are changing the raw data which must be treated as immutable!"
                )
            logger.debug(
                f"Adding material properties to {stage} data for cathode state: {charge_state}"
            )

            # Ensure necessary columns are present
            self._add_materials_properties_columns(stage, self.databases[stage], charge_state)

            for material in tqdm(
                materials_mp_query,
                desc=f"Adding {charge_state} cathodes properties",
                disable=mute_progress_bars,
            ):
                try:
                    # Locate the row in the database corresponding to the material ID
                    i_row = (
                        self.databases[stage]
                        .index[self.databases[stage][f"id_{charge_state}"] == material.material_id]
                        .tolist()[0]
                    )

                    # Assign material properties to the database
                    for property_name in MAT_PROPERTIES.keys():
                        self.databases[stage].at[i_row, f"{charge_state}_{property_name}"] = (
                            getattr(material, property_name, None)
                        )
                except IndexError:
                    logger.error(f"Material ID {material.material_id} not found in the database.")
                    raise MissingData(
                        f"Material ID {material.material_id} not found in the database."
                    )
                except Exception as e:
                    logger.error(
                        f"Failed to add properties for Material ID {material.material_id}: {e}"
                    )
                    raise e

        logger.info(f"Material properties for '{charge_state}' cathodes added successfully.")

    def save_cif_files(
        self,
        stage: str,
        materials_mp_query: list,
        charge_state: str,
        mute_progress_bars: bool = True,
    ) -> None:
        """
        Save CIF files for materials and update the database accordingly.

        Manages the saving of CIF files for each material and updates the database with
        the file paths and relevant properties. Ensures that raw data remains immutable.

        Args:
            stage (str): Processing stage ('raw', 'processed', 'final').
            materials_mp_query (List[Any]): List of material query results.
            charge_state (str): The charge state of the cathode ('charge' or 'discharge').
            mute_progress_bars (bool, optional): Disable progress bar if True. Defaults to True.

        Raises:
            ImmutableRawDataError: If attempting to modify immutable raw data.
        """

        saving_dir = self.database_directories[stage] / charge_state

        if stage == "raw" and not self._update_raw:
            logger.error("Raw data must be treated as immutable!")
            logger.error(
                "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
            )
            raise ImmutableRawDataError(
                "Raw data must be treated as immutable!\n"
                "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
            )
        elif stage == "raw" and saving_dir.exists():
            logger.info(
                "Be careful you are changing the raw data which must be treated as immutable!"
            )

        # Clean the saving directory if it exists
        if saving_dir.exists():
            logger.warning(f"Cleaning the content in {saving_dir}")
            sh.rmtree(saving_dir)

        # Create the saving directory
        saving_dir.mkdir(parents=True, exist_ok=False)
        self.databases[stage][f"{charge_state}_path"] = pd.Series(dtype=str)

        # Save CIF files and update database paths
        for material in tqdm(
            materials_mp_query,
            desc=f"Saving {charge_state} cathodes",
            disable=mute_progress_bars,
        ):
            try:
                # Locate the row in the database corresponding to the material ID
                i_row = (
                    self.databases[stage]
                    .index[self.databases[stage][f"id_{charge_state}"] == material.material_id]
                    .tolist()[0]
                )

                # Define the CIF file path
                cif_path = saving_dir / f"{material.material_id}.cif"

                # Save the CIF file
                material.structure.to(filename=str(cif_path))

                # Update the database with the CIF file path
                self.databases[stage].at[i_row, f"{charge_state}_path"] = str(cif_path)

            except IndexError:
                logger.error(f"Material ID {material.material_id} not found in the database.")
                raise MissingData(f"Material ID {material.material_id} not found in the database.")
            except Exception as e:
                logger.error(f"Failed to save CIF for Material ID {material.material_id}: {e}")
                raise OSError(
                    f"Failed to save CIF for Material ID {material.material_id}: {e}"
                ) from e

        # Save the updated database
        self.save_database(stage)
        logger.info(f"CIF files for stage '{stage}' saved and database updated successfully.")

    def copy_cif_files(
        self,
        stage: str,
        charge_state: str,
        mute_progress_bars: bool = True,
    ) -> None:
        """
        Copy CIF files from the raw stage to another processing stage.

        Copies CIF files corresponding to the specified cathode state from the 'raw'
        processing stage to the target stage. Updates the database with the new file paths.

        Args:
            stage (str): Target processing stage ('processed', 'final').
            charge_state (str): The charge state of the cathode ('charge' or 'discharge').
            mute_progress_bars (bool, optional): Disable progress bar if True. Defaults to True.

        Raises:
            ValueError: If the target stage is 'raw'.
            MissingData: If the source CIF directory does not exist or is empty.
        """
        if stage == "raw":
            logger.error("Stage argument cannot be 'raw'.")
            logger.error("You can only copy from 'raw' to other stages, not to 'raw' itself.")
            raise ValueError("Stage argument cannot be 'raw'.")

        source_dir = self.database_directories["raw"] / charge_state
        saving_dir = self.database_directories[stage] / charge_state

        # Clean the saving directory if it exists
        if saving_dir.exists():
            logger.warning(f"Cleaning the content in {saving_dir}")
            sh.rmtree(saving_dir)

        # Check if source CIF directory exists and is not empty
        if not source_dir.exists() or not any(source_dir.iterdir()):
            logger.warning(
                f"The raw CIF directory does not exist or is empty. Check: {source_dir}"
            )
            raise MissingData(
                f"The raw CIF directory does not exist or is empty. Check: {source_dir}"
            )

        # Create the saving directory
        saving_dir.mkdir(parents=True, exist_ok=False)
        self.databases[stage][f"{charge_state}_path"] = pd.Series(dtype=str)

        # Copy CIF files and update database paths
        for material_id in tqdm(
            self.databases[stage][f"id_{charge_state}"],
            desc=f"Copying {charge_state} cathodes ('raw' -> '{stage}')",
            disable=mute_progress_bars,
        ):
            try:
                # Locate the row in the database corresponding to the material ID
                i_row = (
                    self.databases[stage]
                    .index[self.databases[stage][f"id_{charge_state}"] == material_id]
                    .tolist()[0]
                )

                # Define source and destination CIF file paths
                source_cif_path = source_dir / f"{material_id}.cif"
                cif_path = saving_dir / f"{material_id}.cif"

                # Copy the CIF file
                sh.copyfile(source_cif_path, cif_path)

                # Update the database with the new CIF file path
                self.databases[stage].at[i_row, f"{charge_state}_path"] = str(cif_path)

            except IndexError:
                logger.error(f"Material ID {material_id} not found in the database.")
                raise MissingData(f"Material ID {material_id} not found in the database.")
            except Exception as e:
                logger.error(f"Failed to copy CIF for Material ID {material_id}: {e}")
                raise OSError(f"Failed to copy CIF for Material ID {material_id}: {e}") from e

        # Save the updated database
        self.save_database(stage)
        logger.info(f"CIF files copied to stage '{stage}' and database updated successfully.")

    def __repr__(self) -> str:
        """
        Text representation of the CathodeDatabase instance.
        Used for print() and str() calls.

        Returns:
            str: ASCII table representation of the database
        """
        # Gather information about each stage
        data = {
            "Stage": [],
            "Entries": [],
            "Last Modified": [],
            "Size": [],
            "Storage Path": [],
        }

        # Calculate column widths
        widths = [10, 8, 17, 10, 55]

        for stage in self.processing_stages:
            # Get database info
            db = self.databases[stage]
            path = self.database_paths[stage]

            # Get file modification time and size if file exists
            if path.exists():
                modified = path.stat().st_mtime
                modified_time = pd.Timestamp.fromtimestamp(modified).strftime("%Y-%m-%d %H:%M")
                size = path.stat().st_size / 1024  # Convert to KB
                size_str = f"{size:.1f} KB" if size < 1024 else f"{size / 1024:.1f} MB"
            else:
                modified_time = "Not created"
                size_str = "0 KB"

            path_str = str(path.resolve())
            if len(path_str) > widths[4]:
                path_str = ".." + path_str[len(path_str) - widths[4] + 3 :]

            # Append data
            data["Stage"].append(stage.capitalize())
            data["Entries"].append(len(db))
            data["Last Modified"].append(modified_time)
            data["Size"].append(size_str)
            data["Storage Path"].append(path_str)

        # Create DataFrame
        info_df = pd.DataFrame(data)

        # Text representation for terminal/print
        def create_separator(widths):
            return "+" + "+".join("-" * (w + 1) for w in widths) + "+"

        # Create the text representation
        lines = []

        # Add title
        title = f" {self.__class__.__name__} Summary "
        lines.append(f"\n{title:=^{sum(widths) + len(widths) * 2 + 1}}")

        # Add header
        separator = create_separator(widths)
        lines.append(separator)

        header = (
            "|" + "|".join(f" {col:<{widths[i]}}" for i, col in enumerate(info_df.columns)) + "|"
        )
        lines.append(header)
        lines.append(separator)

        # Add data rows
        for _, row in info_df.iterrows():
            line = "|" + "|".join(f" {str(val):<{widths[i]}}" for i, val in enumerate(row)) + "|"
            lines.append(line)

        # Add bottom separator
        lines.append(separator)

        # Add additional info
        lines.append(f"\nWorking Ion: {self.working_ion}")
        lines.append(f"Battery Type: {self.battery_type}")

        return "\n".join(lines)

    def _repr_html_(self) -> str:
        """
        HTML representation of the CathodeDatabase instance.
        Used for Jupyter notebook display.

        Returns:
            str: HTML representation of the database
        """
        # Gather information about each stage
        data = {
            "Stage": [],
            "Entries": [],
            "Last Modified": [],
            "Size": [],
            "Storage Path": [],
        }

        for stage in self.processing_stages:
            # Get database info
            db = self.databases[stage]
            path = self.database_paths[stage]

            # Get file modification time and size if file exists
            if path.exists():
                modified = path.stat().st_mtime
                modified_time = pd.Timestamp.fromtimestamp(modified).strftime("%Y-%m-%d %H:%M")
                size = path.stat().st_size / 1024  # Convert to KB
                size_str = f"{size:.1f} KB" if size < 1024 else f"{size / 1024:.1f} MB"
            else:
                modified_time = "Not created"
                size_str = "0 KB"

            # Append data
            data["Stage"].append(stage.capitalize())
            data["Entries"].append(len(db))
            data["Last Modified"].append(modified_time)
            data["Size"].append(size_str)
            data["Storage Path"].append(str(path.resolve()))

        # Create DataFrame
        info_df = pd.DataFrame(data)

        # Generate header row
        header_cells = " ".join(
            f'<th style="padding: 12px 15px; text-align: left;">{col}</th>'
            for col in info_df.columns
        )

        # Generate table rows
        table_rows = ""
        for _, row in info_df.iterrows():
            cells = "".join(f'<td style="padding: 12px 15px;">{val}</td>' for val in row)
            table_rows += f"<tr style='border-bottom: 1px solid #e9ecef;'>{cells}</tr>"

        # Create the complete HTML
        html = (
            """<style>
                @media (prefers-color-scheme: dark) {
                    .database-container { background-color: #1e1e1e !important; }
                    .database-title { color: #e0e0e0 !important; }
                    .database-table { background-color: #2d2d2d !important; }
                    .database-header { background-color: #4a4a4a !important; }
                    .database-cell { border-color: #404040 !important; }
                    .database-info { color: #b0b0b0 !important; }
                }
            </style>"""
            '<div style="font-family: Arial, sans-serif; padding: 20px; background:transparent; '
            'border-radius: 8px;">'
            f'<h3 style="color: #58bac7; margin-bottom: 15px;">{self.__class__.__name__}</h3>'
            '<div style="overflow-x: auto;">'
            '<table class="database-table" style="border-collapse: collapse; width: 100%;'
            ' box-shadow: 0 1px 3px rgba(0,0,0,0.1); background:transparent;">'
            # '<table style="border-collapse: collapse; width: 100%; background-color: white; '
            # 'box-shadow: 0 1px 3px rgba(0,0,0,0.1);">'
            "<thead>"
            f'<tr style="background-color: #58bac7; color: white;">{header_cells}</tr>'
            "</thead>"
            f"<tbody>{table_rows}</tbody>"
            "</table>"
            "</div>"
            '<div style="margin-top: 10px; color: #666; font-size: 1.1em;">'
            f"Working Ion: {self.working_ion}<br>"
            f"Battery Type: {self.battery_type}"
            "</div>"
            "</div>"
        )
        return html

__init__(data_dir=DATA_DIR, working_ion='Li', battery_type='insertion')

Initialize the CathodeDatabase with a root data directory and processing stage.

Sets up the directory structure for storing data across different processing stages (raw/, processed/, final/) and initializes placeholders for database paths and data.

Parameters:

Name Type Description Default
data_dir Path

Root directory path for storing data. Defaults to DATA_DIR from config.

DATA_DIR
working_ion str

The working ion used in the dataset (e.g., 'Li'). Defaults to "Li".

'Li'
battery_type str

The type of battery type (e.g., 'insertion', 'conversion'). Defaults to "insertion".

'insertion'

Raises:

Type Description
NotImplementedError

If the specified processing stage is not supported.

ImmutableRawDataError

If attempting to set an unsupported processing stage.

Source code in energy_gnome/dataset/cathodes.py
Python
def __init__(
    self, data_dir: Path = DATA_DIR, working_ion: str = "Li", battery_type: str = "insertion"
):
    """
    Initialize the CathodeDatabase with a root data directory and processing stage.

    Sets up the directory structure for storing data across different processing stages
    (`raw/`, `processed/`, `final/`) and initializes placeholders for database paths and data.

    Args:
        data_dir (Path, optional): Root directory path for storing data.
                                   Defaults to DATA_DIR from config.
        working_ion (str, optional): The working ion used in the dataset (e.g., 'Li').
                                     Defaults to "Li".
        battery_type (str, optional): The type of battery type (e.g., 'insertion', 'conversion').
                                      Defaults to "insertion".

    Raises:
        NotImplementedError: If the specified processing stage is not supported.
        ImmutableRawDataError: If attempting to set an unsupported processing stage.
    """
    super().__init__(data_dir=data_dir)
    self.working_ion = working_ion

    if battery_type == "insertion":
        self.battery_type = battery_type
    elif battery_type == "conversion":
        logger.error("`conversion` battery type is not yet implemented in Material Project.")
        raise NotImplementedError(
            "`conversion` battery type is not yet present in Material Project."
        )
    else:
        logger.error(
            f"Invalid battery type: {battery_type}. Must be 'insertion' or 'conversion'."
        )
        raise ValueError(
            "`battery_type` can be only `insertion` or `conversion` (not yet implemented)"
        )

    # Initialize directories, paths, and databases for each stage
    self.database_directories = {
        stage: self.data_dir / stage / "cathodes" / battery_type / working_ion
        for stage in self.processing_stages
    }
    for stage_dir in self.database_directories.values():
        stage_dir.mkdir(parents=True, exist_ok=True)

    self.database_paths = {
        stage: dir_path / "database.json"
        for stage, dir_path in self.database_directories.items()
    }

    self.databases = {stage: pd.DataFrame() for stage in self.processing_stages}
    self._battery_models = pd.DataFrame()

__repr__()

Text representation of the CathodeDatabase instance. Used for print() and str() calls.

Returns:

Name Type Description
str str

ASCII table representation of the database

Source code in energy_gnome/dataset/cathodes.py
Python
def __repr__(self) -> str:
    """
    Text representation of the CathodeDatabase instance.
    Used for print() and str() calls.

    Returns:
        str: ASCII table representation of the database
    """
    # Gather information about each stage
    data = {
        "Stage": [],
        "Entries": [],
        "Last Modified": [],
        "Size": [],
        "Storage Path": [],
    }

    # Calculate column widths
    widths = [10, 8, 17, 10, 55]

    for stage in self.processing_stages:
        # Get database info
        db = self.databases[stage]
        path = self.database_paths[stage]

        # Get file modification time and size if file exists
        if path.exists():
            modified = path.stat().st_mtime
            modified_time = pd.Timestamp.fromtimestamp(modified).strftime("%Y-%m-%d %H:%M")
            size = path.stat().st_size / 1024  # Convert to KB
            size_str = f"{size:.1f} KB" if size < 1024 else f"{size / 1024:.1f} MB"
        else:
            modified_time = "Not created"
            size_str = "0 KB"

        path_str = str(path.resolve())
        if len(path_str) > widths[4]:
            path_str = ".." + path_str[len(path_str) - widths[4] + 3 :]

        # Append data
        data["Stage"].append(stage.capitalize())
        data["Entries"].append(len(db))
        data["Last Modified"].append(modified_time)
        data["Size"].append(size_str)
        data["Storage Path"].append(path_str)

    # Create DataFrame
    info_df = pd.DataFrame(data)

    # Text representation for terminal/print
    def create_separator(widths):
        return "+" + "+".join("-" * (w + 1) for w in widths) + "+"

    # Create the text representation
    lines = []

    # Add title
    title = f" {self.__class__.__name__} Summary "
    lines.append(f"\n{title:=^{sum(widths) + len(widths) * 2 + 1}}")

    # Add header
    separator = create_separator(widths)
    lines.append(separator)

    header = (
        "|" + "|".join(f" {col:<{widths[i]}}" for i, col in enumerate(info_df.columns)) + "|"
    )
    lines.append(header)
    lines.append(separator)

    # Add data rows
    for _, row in info_df.iterrows():
        line = "|" + "|".join(f" {str(val):<{widths[i]}}" for i, val in enumerate(row)) + "|"
        lines.append(line)

    # Add bottom separator
    lines.append(separator)

    # Add additional info
    lines.append(f"\nWorking Ion: {self.working_ion}")
    lines.append(f"Battery Type: {self.battery_type}")

    return "\n".join(lines)

add_material_properties(stage, materials_mp_query, charge_state, mute_progress_bars=True)

Add material properties to the database from Material Project query results.

Saves CIF files for each material in the query and updates the database with file paths and properties.

Parameters:

Name Type Description Default
stage str

Processing stage ('raw', 'processed', 'final').

required
materials_mp_query List[Any]

List of material query results.

required
charge_state str

The state of the cathode ('charge' or 'discharge').

required
mute_progress_bars bool

Disable progress bar if True. Defaults to True.

True

Returns:

Type Description
DataFrame

pd.DataFrame: Updated database with material properties.

Raises:

Type Description
ImmutableRawDataError

If attempting to modify immutable raw data.

KeyError

If a material ID is not found in the database.

Source code in energy_gnome/dataset/cathodes.py
Python
def add_material_properties(
    self,
    stage: str,
    materials_mp_query: list,
    charge_state: str,
    mute_progress_bars: bool = True,
) -> pd.DataFrame:
    """
    Add material properties to the database from Material Project query results.

    Saves CIF files for each material in the query and updates the database with file paths and properties.

    Args:
        stage (str): Processing stage ('raw', 'processed', 'final').
        materials_mp_query (List[Any]): List of material query results.
        charge_state (str): The state of the cathode ('charge' or 'discharge').
        mute_progress_bars (bool, optional): Disable progress bar if True. Defaults to True.

    Returns:
        pd.DataFrame: Updated database with material properties.

    Raises:
        ImmutableRawDataError: If attempting to modify immutable raw data.
        KeyError: If a material ID is not found in the database.
    """
    if stage == "raw" and not self._update_raw:
        logger.error("Raw data must be treated as immutable!")
        logger.error(
            "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
        )
        raise ImmutableRawDataError(
            "Raw data must be treated as immutable!\n"
            "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
        )
    else:
        if stage == "raw":
            logger.info(
                "Be careful you are changing the raw data which must be treated as immutable!"
            )
        logger.debug(
            f"Adding material properties to {stage} data for cathode state: {charge_state}"
        )

        # Ensure necessary columns are present
        self._add_materials_properties_columns(stage, self.databases[stage], charge_state)

        for material in tqdm(
            materials_mp_query,
            desc=f"Adding {charge_state} cathodes properties",
            disable=mute_progress_bars,
        ):
            try:
                # Locate the row in the database corresponding to the material ID
                i_row = (
                    self.databases[stage]
                    .index[self.databases[stage][f"id_{charge_state}"] == material.material_id]
                    .tolist()[0]
                )

                # Assign material properties to the database
                for property_name in MAT_PROPERTIES.keys():
                    self.databases[stage].at[i_row, f"{charge_state}_{property_name}"] = (
                        getattr(material, property_name, None)
                    )
            except IndexError:
                logger.error(f"Material ID {material.material_id} not found in the database.")
                raise MissingData(
                    f"Material ID {material.material_id} not found in the database."
                )
            except Exception as e:
                logger.error(
                    f"Failed to add properties for Material ID {material.material_id}: {e}"
                )
                raise e

    logger.info(f"Material properties for '{charge_state}' cathodes added successfully.")

backup_and_changelog(old_db, new_db, differences, stage)

Backup the old database and update the changelog with identified differences.

Creates a backup of the existing database and appends a changelog entry detailing the differences between the old and new databases. The changelog includes information such as entry identifiers, formulas, and last updated timestamps.

Parameters:

Name Type Description Default
old_db DataFrame

The existing database before updates.

required
new_db DataFrame

The new database containing updates.

required
differences Series

Series of identifiers that are new or updated.

required
stage str

The processing stage ('raw', 'processed', 'final').

required
Source code in energy_gnome/dataset/cathodes.py
Python
def backup_and_changelog(
    self,
    old_db: pd.DataFrame,
    new_db: pd.DataFrame,
    differences: pd.Series,
    stage: str,
) -> None:
    """
    Backup the old database and update the changelog with identified differences.

    Creates a backup of the existing database and appends a changelog entry detailing
    the differences between the old and new databases. The changelog includes
    information such as entry identifiers, formulas, and last updated timestamps.

    Args:
        old_db (pd.DataFrame): The existing database before updates.
        new_db (pd.DataFrame): The new database containing updates.
        differences (pd.Series): Series of identifiers that are new or updated.
        stage (str): The processing stage ('raw', 'processed', 'final').
    """
    if stage not in self.processing_stages:
        logger.error(f"Invalid stage: {stage}. Must be one of {self.processing_stages}.")
        raise ValueError(f"stage must be one of {self.processing_stages}.")

    backup_path = self.database_directories[stage] / "old_database.json"
    try:
        old_db.to_json(backup_path)
        logger.debug(f"Old database backed up to {backup_path}")
    except Exception as e:
        logger.error(f"Failed to backup old database to {backup_path}: {e}")
        raise OSError(f"Failed to backup old database to {backup_path}: {e}") from e

    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    changelog_path = self.database_directories[stage] / "changelog.txt"
    changelog_entries = [
        f"= Change Log - {timestamp} ".ljust(70, "=") + "\n",
        "Difference old_database.json VS database.json\n",
        f"{'ID':<15}{'Formula':<30}{'Last Updated (MP)':<25}\n",
        "-" * 70 + "\n",
    ]
    # Tailoring respect father class
    for identifier in differences["battery_id"]:
        row = new_db.loc[new_db["battery_id"] == identifier]
        if not row.empty:
            formula = row["battery_formula"].values[0]
            last_updated = row["last_updated"].values[0]
        else:
            formula = "N/A"
            last_updated = "N/A"
        changelog_entries.append(f"{identifier:<15}{formula:<30}{last_updated:<20}\n")

    try:
        with open(changelog_path, "a") as file:
            file.writelines(changelog_entries)
        logger.debug(f"Changelog updated at {changelog_path} with {len(differences)} changes.")
    except Exception as e:
        logger.error(f"Failed to update changelog at {changelog_path}: {e}")
        raise OSError(f"Failed to update changelog at {changelog_path}: {e}") from e

compare_and_update(new_db, stage)

Compare and update the database with new entries.

Identifies new entries and updates the database accordingly. Ensures that raw data remains immutable by preventing updates unless explicitly allowed.

Parameters:

Name Type Description Default
new_db DataFrame

New database to compare.

required
stage str

Processing stage ("raw", "processed", "final").

required

Returns:

Type Description
DataFrame

pd.DataFrame: Updated database containing new entries.

Raises:

Type Description
ImmutableRawDataError

If attempting to modify immutable raw data.

Source code in energy_gnome/dataset/cathodes.py
Python
def compare_and_update(self, new_db: pd.DataFrame, stage: str) -> pd.DataFrame:
    """
    Compare and update the database with new entries.

    Identifies new entries and updates the database accordingly. Ensures that raw data
    remains immutable by preventing updates unless explicitly allowed.

    Args:
        new_db (pd.DataFrame): New database to compare.
        stage (str): Processing stage ("raw", "processed", "final").

    Returns:
        pd.DataFrame: Updated database containing new entries.

    Raises:
        ImmutableRawDataError: If attempting to modify immutable raw data.
    """
    old_db = self.load_database(stage=stage)
    db_diff = self.compare_databases(new_db, stage)
    if not db_diff.empty:
        logger.warning(f"The new database contains {len(db_diff)} new items.")

        if stage == "raw" and not self._update_raw:
            logger.error("Raw data must be treated as immutable!")
            logger.error(
                "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
            )
            raise ImmutableRawDataError(
                "Raw data must be treated as immutable!\n"
                "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
            )
        else:
            if stage == "raw":
                logger.info(
                    "Be careful you are changing the raw data which must be treated as immutable!"
                )
            logger.info(
                f"Updating the {stage} data and saving it in {self.database_paths[stage]}."
            )
            self.backup_and_changelog(
                old_db,
                new_db,
                db_diff,
                stage,
            )
            self.databases[stage] = new_db
            self.save_database(stage)
    else:
        logger.info("No new items found. No update required.")

compare_databases(new_db, stage)

Compare two databases and identify new entry IDs.

Parameters:

Name Type Description Default
new_db DataFrame

New database to compare.

required
stage str

Processing stage ("raw", "processed", "final").

required

Returns:

Type Description
DataFrame

pd.DataFrame: Subset of new_db containing only new entry IDs.

Source code in energy_gnome/dataset/cathodes.py
Python
def compare_databases(self, new_db: pd.DataFrame, stage: str) -> pd.DataFrame:
    """
    Compare two databases and identify new entry IDs.

    Args:
        new_db (pd.DataFrame): New database to compare.
        stage (str): Processing stage ("raw", "processed", "final").

    Returns:
        pd.DataFrame: Subset of `new_db` containing only new entry IDs.
    """
    old_db = self.load_database(stage=stage)
    if not old_db.empty:
        new_ids_set = set(new_db["battery_id"])
        old_ids_set = set(old_db["battery_id"])
        new_ids_only = new_ids_set - old_ids_set
        logger.debug(f"Found {len(new_ids_only)} new battery IDs in the new database.")
        return new_db[new_db["battery_id"].isin(new_ids_only)]
    else:
        logger.warning("Nothing to compare here...")
        return new_db

copy_cif_files(stage, charge_state, mute_progress_bars=True)

Copy CIF files from the raw stage to another processing stage.

Copies CIF files corresponding to the specified cathode state from the 'raw' processing stage to the target stage. Updates the database with the new file paths.

Parameters:

Name Type Description Default
stage str

Target processing stage ('processed', 'final').

required
charge_state str

The charge state of the cathode ('charge' or 'discharge').

required
mute_progress_bars bool

Disable progress bar if True. Defaults to True.

True

Raises:

Type Description
ValueError

If the target stage is 'raw'.

MissingData

If the source CIF directory does not exist or is empty.

Source code in energy_gnome/dataset/cathodes.py
Python
def copy_cif_files(
    self,
    stage: str,
    charge_state: str,
    mute_progress_bars: bool = True,
) -> None:
    """
    Copy CIF files from the raw stage to another processing stage.

    Copies CIF files corresponding to the specified cathode state from the 'raw'
    processing stage to the target stage. Updates the database with the new file paths.

    Args:
        stage (str): Target processing stage ('processed', 'final').
        charge_state (str): The charge state of the cathode ('charge' or 'discharge').
        mute_progress_bars (bool, optional): Disable progress bar if True. Defaults to True.

    Raises:
        ValueError: If the target stage is 'raw'.
        MissingData: If the source CIF directory does not exist or is empty.
    """
    if stage == "raw":
        logger.error("Stage argument cannot be 'raw'.")
        logger.error("You can only copy from 'raw' to other stages, not to 'raw' itself.")
        raise ValueError("Stage argument cannot be 'raw'.")

    source_dir = self.database_directories["raw"] / charge_state
    saving_dir = self.database_directories[stage] / charge_state

    # Clean the saving directory if it exists
    if saving_dir.exists():
        logger.warning(f"Cleaning the content in {saving_dir}")
        sh.rmtree(saving_dir)

    # Check if source CIF directory exists and is not empty
    if not source_dir.exists() or not any(source_dir.iterdir()):
        logger.warning(
            f"The raw CIF directory does not exist or is empty. Check: {source_dir}"
        )
        raise MissingData(
            f"The raw CIF directory does not exist or is empty. Check: {source_dir}"
        )

    # Create the saving directory
    saving_dir.mkdir(parents=True, exist_ok=False)
    self.databases[stage][f"{charge_state}_path"] = pd.Series(dtype=str)

    # Copy CIF files and update database paths
    for material_id in tqdm(
        self.databases[stage][f"id_{charge_state}"],
        desc=f"Copying {charge_state} cathodes ('raw' -> '{stage}')",
        disable=mute_progress_bars,
    ):
        try:
            # Locate the row in the database corresponding to the material ID
            i_row = (
                self.databases[stage]
                .index[self.databases[stage][f"id_{charge_state}"] == material_id]
                .tolist()[0]
            )

            # Define source and destination CIF file paths
            source_cif_path = source_dir / f"{material_id}.cif"
            cif_path = saving_dir / f"{material_id}.cif"

            # Copy the CIF file
            sh.copyfile(source_cif_path, cif_path)

            # Update the database with the new CIF file path
            self.databases[stage].at[i_row, f"{charge_state}_path"] = str(cif_path)

        except IndexError:
            logger.error(f"Material ID {material_id} not found in the database.")
            raise MissingData(f"Material ID {material_id} not found in the database.")
        except Exception as e:
            logger.error(f"Failed to copy CIF for Material ID {material_id}: {e}")
            raise OSError(f"Failed to copy CIF for Material ID {material_id}: {e}") from e

    # Save the updated database
    self.save_database(stage)
    logger.info(f"CIF files copied to stage '{stage}' and database updated successfully.")

retrieve_materials(stage, charge_state, mute_progress_bars=True)

Retrieve material structures from the Material Project API.

Fetches material structures based on the processing stage and charge state.

Parameters:

Name Type Description Default
stage str

Processing stage ('raw', 'processed', 'final').

required
charge_state str

Cathode charge state ('charge', 'discharge').

required
mute_progress_bars bool

Disable progress bar if True. Defaults to True.

True

Returns:

Type Description
list[Any]

List[Any]: List of retrieved material objects.

Raises:

Type Description
ValueError

If the charge_state is invalid.

MissingData

If the required data is missing in the database.

Source code in energy_gnome/dataset/cathodes.py
Python
def retrieve_materials(
    self, stage: str, charge_state: str, mute_progress_bars: bool = True
) -> list[Any]:
    """
    Retrieve material structures from the Material Project API.

    Fetches material structures based on the processing stage and charge state.

    Args:
        stage (str): Processing stage ('raw', 'processed', 'final').
        charge_state (str): Cathode charge state ('charge', 'discharge').
        mute_progress_bars (bool, optional): Disable progress bar if True. Defaults to True.

    Returns:
        List[Any]: List of retrieved material objects.

    Raises:
        ValueError: If the charge_state is invalid.
        MissingData: If the required data is missing in the database.
    """
    if charge_state not in ["charge", "discharge"]:
        logger.error(f"Invalid charge_state: {charge_state}. Must be 'charge' or 'discharge'.")
        raise ValueError("charge_state must be 'charge' or 'discharge'.")

    material_ids = self.databases[stage][f"id_{charge_state}"].tolist()
    if not material_ids:
        logger.warning(
            f"No material IDs found for stage '{stage}' and charge_state '{charge_state}'."
        )
        raise MissingData(
            f"No material IDs found for stage '{stage}' and charge_state '{charge_state}'."
        )

    logger.debug(
        f"Retrieving materials for stage '{stage}' and charge_state '{charge_state}'."
    )
    query = get_material_by_id(
        material_ids,
        mute_progress_bars=mute_progress_bars,
    )
    return query

retrieve_models(mute_progress_bars=True)

Retrieve battery models from the Materials Project API.

Connects to the Material Project API using MPRester, queries for materials based on the working ion and processing stage, and retrieves the specified fields. Cleans the data by removing entries with missing critical identifiers.

Parameters:

Name Type Description Default
mute_progress_bars bool

If True, mutes the Material Project API progress bars. Defaults to True.

True

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing the retrieved and cleaned models.

Raises:

Type Description
Exception

If the API query fails.

Source code in energy_gnome/dataset/cathodes.py
Python
def retrieve_models(self, mute_progress_bars: bool = True) -> pd.DataFrame:
    """
    Retrieve battery models from the Materials Project API.

    Connects to the Material Project API using MPRester, queries for materials
    based on the working ion and processing stage, and retrieves the specified fields.
    Cleans the data by removing entries with missing critical identifiers.

    Args:
        mute_progress_bars (bool, optional):
            If `True`, mutes the Material Project API progress bars.
            Defaults to `True`.

    Returns:
        pd.DataFrame: DataFrame containing the retrieved and cleaned models.

    Raises:
        Exception: If the API query fails.
    """
    mp_api_key = get_mp_api_key()
    logger.debug("MP querying for insertion battery models.")

    with MPRester(mp_api_key, mute_progress_bars=mute_progress_bars) as mpr:
        try:
            query = mpr.materials.insertion_electrodes.search(
                working_ion=self.working_ion, fields=BAT_FIELDS
            )
            logger.info(
                f"MP query successful, {len(query)} {self.working_ion}-ion batteries found."
            )
        except Exception as e:
            raise e
    logger.debug("Converting MP query results into DataFrame.")
    battery_models_database = convert_my_query_to_dataframe(
        query, mute_progress_bars=mute_progress_bars
    )

    # Fast cleaning
    logger.debug("Removing NaN")
    battery_models_database = battery_models_database.dropna(
        axis=0, how="any", subset=["id_charge", "id_discharge"]
    )
    battery_models_database = battery_models_database.dropna(axis=1, how="all")
    self._battery_models = battery_models_database
    logger.success(f"{self.working_ion}-ion batteries model retrieved successfully.")
    return self._battery_models

retrieve_remote(mute_progress_bars=True)

Retrieve models from the Material Project API.

Wrapper method to call retrieve_models.

Parameters:

Name Type Description Default
mute_progress_bars bool

If True, mutes the Material Project API progress bars. Defaults to True.

True

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing the retrieved models.

Source code in energy_gnome/dataset/cathodes.py
Python
def retrieve_remote(self, mute_progress_bars: bool = True) -> pd.DataFrame:
    """
    Retrieve models from the Material Project API.

    Wrapper method to call `retrieve_models`.

    Args:
        mute_progress_bars (bool, optional):
            If `True`, mutes the Material Project API progress bars.
            Defaults to `True`.

    Returns:
        pd.DataFrame: DataFrame containing the retrieved models.
    """
    return self.retrieve_models(mute_progress_bars=mute_progress_bars)

save_cif_files(stage, materials_mp_query, charge_state, mute_progress_bars=True)

Save CIF files for materials and update the database accordingly.

Manages the saving of CIF files for each material and updates the database with the file paths and relevant properties. Ensures that raw data remains immutable.

Parameters:

Name Type Description Default
stage str

Processing stage ('raw', 'processed', 'final').

required
materials_mp_query List[Any]

List of material query results.

required
charge_state str

The charge state of the cathode ('charge' or 'discharge').

required
mute_progress_bars bool

Disable progress bar if True. Defaults to True.

True

Raises:

Type Description
ImmutableRawDataError

If attempting to modify immutable raw data.

Source code in energy_gnome/dataset/cathodes.py
Python
def save_cif_files(
    self,
    stage: str,
    materials_mp_query: list,
    charge_state: str,
    mute_progress_bars: bool = True,
) -> None:
    """
    Save CIF files for materials and update the database accordingly.

    Manages the saving of CIF files for each material and updates the database with
    the file paths and relevant properties. Ensures that raw data remains immutable.

    Args:
        stage (str): Processing stage ('raw', 'processed', 'final').
        materials_mp_query (List[Any]): List of material query results.
        charge_state (str): The charge state of the cathode ('charge' or 'discharge').
        mute_progress_bars (bool, optional): Disable progress bar if True. Defaults to True.

    Raises:
        ImmutableRawDataError: If attempting to modify immutable raw data.
    """

    saving_dir = self.database_directories[stage] / charge_state

    if stage == "raw" and not self._update_raw:
        logger.error("Raw data must be treated as immutable!")
        logger.error(
            "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
        )
        raise ImmutableRawDataError(
            "Raw data must be treated as immutable!\n"
            "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
        )
    elif stage == "raw" and saving_dir.exists():
        logger.info(
            "Be careful you are changing the raw data which must be treated as immutable!"
        )

    # Clean the saving directory if it exists
    if saving_dir.exists():
        logger.warning(f"Cleaning the content in {saving_dir}")
        sh.rmtree(saving_dir)

    # Create the saving directory
    saving_dir.mkdir(parents=True, exist_ok=False)
    self.databases[stage][f"{charge_state}_path"] = pd.Series(dtype=str)

    # Save CIF files and update database paths
    for material in tqdm(
        materials_mp_query,
        desc=f"Saving {charge_state} cathodes",
        disable=mute_progress_bars,
    ):
        try:
            # Locate the row in the database corresponding to the material ID
            i_row = (
                self.databases[stage]
                .index[self.databases[stage][f"id_{charge_state}"] == material.material_id]
                .tolist()[0]
            )

            # Define the CIF file path
            cif_path = saving_dir / f"{material.material_id}.cif"

            # Save the CIF file
            material.structure.to(filename=str(cif_path))

            # Update the database with the CIF file path
            self.databases[stage].at[i_row, f"{charge_state}_path"] = str(cif_path)

        except IndexError:
            logger.error(f"Material ID {material.material_id} not found in the database.")
            raise MissingData(f"Material ID {material.material_id} not found in the database.")
        except Exception as e:
            logger.error(f"Failed to save CIF for Material ID {material.material_id}: {e}")
            raise OSError(
                f"Failed to save CIF for Material ID {material.material_id}: {e}"
            ) from e

    # Save the updated database
    self.save_database(stage)
    logger.info(f"CIF files for stage '{stage}' saved and database updated successfully.")

energy_gnome.dataset.PerovskiteDatabase

Bases: BaseDatabase

Source code in energy_gnome/dataset/perovskites.py
Python
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
class PerovskiteDatabase(BaseDatabase):
    def __init__(
        self,
        data_dir: Path | str = DATA_DIR,
        external_perovproj_path: Path | str = EXTERNAL_DATA_DIR
        / Path("perovskites")
        / Path("perovproject_db.json"),
    ):
        """
        Initialize the PerovskiteDatabase with a root data directory and processing stage.

        Sets up the directory structure for storing data across different processing stages
        (`raw/`, `processed/`, `final/`) and initializes placeholders for database paths and data.

        Args:
            data_dir (Path, optional): Root directory path for storing data.
                                       Defaults to DATA_DIR from config.

        Raises:
            NotImplementedError: If the specified processing stage is not supported.
            ImmutableRawDataError: If attempting to set an unsupported processing stage.
        """
        super().__init__(data_dir=data_dir)

        # Initialize directories, paths, and databases for each stage
        self.database_directories = {
            stage: self.data_dir / stage / "perovskites" for stage in self.processing_stages
        }
        for stage_dir in self.database_directories.values():
            stage_dir.mkdir(parents=True, exist_ok=True)

        self.database_paths = {
            stage: dir_path / "database.json"
            for stage, dir_path in self.database_directories.items()
        }

        self.databases = {stage: pd.DataFrame() for stage in self.processing_stages}
        self._perovskites = pd.DataFrame()
        self.external_perovproj_path: Path | str = external_perovproj_path

    def retrieve_remote(self, mute_progress_bars: bool = True) -> pd.DataFrame:
        """
        Retrieve materials from the Material Project API.

        Wrapper method to call `retrieve_materials`.

        Args:
            mute_progress_bars (bool, optional):
                If `True`, mutes the Material Project API progress bars.
                Defaults to `True`.

        Returns:
            pd.DataFrame: DataFrame containing the retrieved materials.
        """
        return self.retrieve_materials(mute_progress_bars=mute_progress_bars)

    def _pre_retrieve_robo(self, mute_progress_bars: bool = True) -> list[str]:
        mp_api_key = get_mp_api_key()
        with MPRester(mp_api_key, mute_progress_bars=mute_progress_bars) as mpr:
            try:
                query = mpr.materials.robocrys.search(keywords=["Perovskite", "perovskite"])
                logger.info(
                    f"MP query successful, {len(query)} perovskite IDs found through Robocrystallographer."
                )
            except Exception as e:
                raise e
        ids_list_robo = [q.material_id for q in query]
        return ids_list_robo

    def _pre_retrieve_perovproj(self, mute_progress_bars: bool = True) -> list[str]:
        mp_api_key = get_mp_api_key()
        with open(self.external_perovproj_path) as f:
            dict_ = json.load(f)
        with MPRester(mp_api_key, mute_progress_bars=mute_progress_bars) as mpr:
            try:
                query = mpr.materials.summary.search(formula=dict_, fields="material_id")
                logger.info(
                    f"MP query successful, {len(query)} perovskite IDs found through Perovskite Project formulae."
                )
            except Exception as e:
                raise e
        ids_list_perovproj = [q.material_id for q in query]
        return ids_list_perovproj

    def retrieve_materials(self, mute_progress_bars: bool = True) -> pd.DataFrame:
        """
        Retrieve perovskites from the Materials Project API.

        Connects to the Material Project API using MPRester, queries for materials, and retrieves the specified fields.
        Cleans the data by removing entries with missing critical identifiers.

        Args:
            mute_progress_bars (bool, optional):
                If `True`, mutes the Material Project API progress bars.
                Defaults to `True`.

        Returns:
            pd.DataFrame: DataFrame containing the retrieved and cleaned models.

        Raises:
            Exception: If the API query fails.
        """
        mp_api_key = get_mp_api_key()
        ids_list_robo = self._pre_retrieve_robo(mute_progress_bars=mute_progress_bars)
        ids_list_perovproj = self._pre_retrieve_perovproj(mute_progress_bars=mute_progress_bars)
        logger.debug("MP querying for perovskites.")

        ids_list = ids_list_robo + ids_list_perovproj
        unique_ids = list()
        for x in ids_list:
            if x not in unique_ids:
                unique_ids.append(x)

        with MPRester(mp_api_key, mute_progress_bars=mute_progress_bars) as mpr:
            try:
                query = mpr.materials.summary.search(
                    material_ids=unique_ids, fields=MAT_PROPERTIES
                )
                logger.info(
                    f"MP query successful, {len(query)} perovskites found through Robocrystallographer and Perovskite Project formulae."
                )
            except Exception as e:
                raise e
        logger.debug("Converting MP query results into DataFrame.")
        perovskites_database = convert_my_query_to_dataframe_perovskites(
            query, mute_progress_bars=mute_progress_bars
        )

        query_ids = list()
        for m in query:
            query_ids.append(m.material_id)

        # Fast cleaning
        logger.debug("Removing NaN (rows)")
        logger.debug(f"size DB before = {len(perovskites_database)}")
        perovskites_database = perovskites_database.dropna(
            axis=0, how="any", subset=BAND_CRITICAL_FIELD
        )
        logger.debug(f"size DB after = {len(perovskites_database)}")
        logger.debug("Removing NaN (cols)")
        logger.debug(f"size DB before = {len(perovskites_database)}")
        perovskites_database = perovskites_database.dropna(axis=1, how="all")
        logger.debug(f"size DB after = {len(perovskites_database)}")

        # Filtering
        logger.debug("Removing metallic perovskites.")
        logger.debug(f"size DB before = {len(perovskites_database)}")
        filtered_perov_database = perovskites_database[~(perovskites_database["is_metal"])]
        logger.debug(f"size DB after = {len(filtered_perov_database)}")

        query_ids_filtered = filtered_perov_database["material_id"]
        diff = set(query_ids) - set(query_ids_filtered)

        reach_end = False
        while not reach_end:
            for i, q in enumerate(query):
                if q.material_id in diff:
                    query.pop(i)
                    break
            if i == len(query) - 1:
                reach_end = True

        self._perovskites = filtered_perov_database.copy()

        logger.success("Perovskites retrieved successfully.")
        return self._perovskites, query

    def compare_databases(self, new_db: pd.DataFrame, stage: str) -> pd.DataFrame:
        """
        Compare two databases and identify new entry IDs.

        Args:
            new_db (pd.DataFrame): New database to compare.
            stage (str): Processing stage ("raw", "processed", "final").

        Returns:
            pd.DataFrame: Subset of `new_db` containing only new entry IDs.
        """
        old_db = self.load_database(stage=stage)
        if not old_db.empty:
            new_ids_set = set(new_db["material_id"])
            old_ids_set = set(old_db["material_id"])
            new_ids_only = new_ids_set - old_ids_set
            logger.debug(f"Found {len(new_ids_only)} new perovskite IDs in the new database.")
            return new_db[new_db["material_id"].isin(new_ids_only)]
        else:
            logger.warning("Nothing to compare here...")
            return new_db

    def backup_and_changelog(
        self,
        old_db: pd.DataFrame,
        new_db: pd.DataFrame,
        differences: pd.Series,
        stage: str,
    ) -> None:
        """
        Backup the old database and update the changelog with identified differences.

        Creates a backup of the existing database and appends a changelog entry detailing
        the differences between the old and new databases. The changelog includes
        information such as entry identifiers, formulas, and last updated timestamps.

        Args:
            old_db (pd.DataFrame): The existing database before updates.
            new_db (pd.DataFrame): The new database containing updates.
            differences (pd.Series): Series of identifiers that are new or updated.
            stage (str): The processing stage ('raw', 'processed', 'final').
        """
        if stage not in self.processing_stages:
            logger.error(f"Invalid stage: {stage}. Must be one of {self.processing_stages}.")
            raise ValueError(f"stage must be one of {self.processing_stages}.")

        backup_path = self.database_directories[stage] / "old_database.json"
        try:
            old_db.to_json(backup_path)
            logger.debug(f"Old database backed up to {backup_path}")
        except Exception as e:
            logger.error(f"Failed to backup old database to {backup_path}: {e}")
            raise OSError(f"Failed to backup old database to {backup_path}: {e}") from e

        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        changelog_path = self.database_directories[stage] / "changelog.txt"
        changelog_entries = [
            f"= Change Log - {timestamp} ".ljust(70, "=") + "\n",
            "Difference old_database.json VS database.json\n",
            f"{'ID':<15}{'Formula':<30}{'Last Updated (MP)':<25}\n",
            "-" * 70 + "\n",
        ]
        # Tailoring respect father class
        for identifier in differences["material_id"]:
            row = new_db.loc[new_db["material_id"] == identifier]
            if not row.empty:
                formula = row["formula_pretty"].values[0]
                last_updated = row["last_updated"].values[0]
            else:
                formula = "N/A"
                last_updated = "N/A"
            changelog_entries.append(f"{identifier:<15}{formula:<30}{last_updated:<20}\n")

        try:
            with open(changelog_path, "a") as file:
                file.writelines(changelog_entries)
            logger.debug(f"Changelog updated at {changelog_path} with {len(differences)} changes.")
        except Exception as e:
            logger.error(f"Failed to update changelog at {changelog_path}: {e}")
            raise OSError(f"Failed to update changelog at {changelog_path}: {e}") from e

    def compare_and_update(self, new_db: pd.DataFrame, stage: str) -> pd.DataFrame:
        """
        Compare and update the database with new entries.

        Identifies new entries and updates the database accordingly. Ensures that raw data
        remains immutable by preventing updates unless explicitly allowed.

        Args:
            new_db (pd.DataFrame): New database to compare.
            stage (str): Processing stage ("raw", "processed", "final").

        Returns:
            pd.DataFrame: Updated database containing new entries.

        Raises:
            ImmutableRawDataError: If attempting to modify immutable raw data.
        """
        old_db = self.load_database(stage=stage)
        db_diff = self.compare_databases(new_db, stage)
        if not db_diff.empty:
            logger.warning(f"The new database contains {len(db_diff)} new items.")

            if stage == "raw" and not self._update_raw:
                logger.error("Raw data must be treated as immutable!")
                logger.error(
                    "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
                )
                raise ImmutableRawDataError(
                    "Raw data must be treated as immutable!\n"
                    "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
                )
            else:
                if stage == "raw":
                    logger.info(
                        "Be careful you are changing the raw data which must be treated as immutable!"
                    )
                logger.info(
                    f"Updating the {stage} data and saving it in {self.database_paths[stage]}."
                )
                self.backup_and_changelog(
                    old_db,
                    new_db,
                    db_diff,
                    stage,
                )
                self.databases[stage] = new_db
                self.save_database(stage)
        else:
            logger.info("No new items found. No update required.")

    def _add_materials_properties_columns(self, stage: str) -> pd.DataFrame:
        """
        Add material properties columns to the database for a given perovskite.

        Args:
            stage (str): Processing stage ('raw', 'processed', 'final').

        Returns:
            pd.DataFrame: Updated database with material properties columns.

        Raises:
            ImmutableRawDataError: If attempting to modify immutable raw data.
        """
        pass

    def add_material_properties(
        self,
        stage: str,
        materials_mp_query: list,
        mute_progress_bars: bool = True,
    ) -> pd.DataFrame:
        """
        Add material properties to the database from Material Project query results.

        Saves CIF files for each material in the query and updates the database with file paths and properties.

        Args:
            stage (str): Processing stage ('raw', 'processed', 'final').
            materials_mp_query (List[Any]): List of material query results.
            mute_progress_bars (bool, optional): Disable progress bar if True. Defaults to True.

        Returns:
            pd.DataFrame: Updated database with material properties.

        Raises:
            ImmutableRawDataError: If attempting to modify immutable raw data.
            KeyError: If a material ID is not found in the database.
        """
        pass

    def save_cif_files(
        self,
        stage: str,
        materials_mp_query: list,
        mute_progress_bars: bool = True,
    ) -> None:
        """
        Save CIF files for materials and update the database accordingly.

        Manages the saving of CIF files for each material and updates the database with
        the file paths and relevant properties. Ensures that raw data remains immutable.

        Args:
            stage (str): Processing stage ('raw', 'processed', 'final').
            materials_mp_query (List[Any]): List of material query results.
            mute_progress_bars (bool, optional): Disable progress bar if True. Defaults to True.

        Raises:
            ImmutableRawDataError: If attempting to modify immutable raw data.
        """

        saving_dir = self.database_directories[stage] / "structures/"

        if stage == "raw" and not self._update_raw:
            logger.error("Raw data must be treated as immutable!")
            logger.error(
                "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
            )
            raise ImmutableRawDataError(
                "Raw data must be treated as immutable!\n"
                "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
            )
        elif stage == "raw" and saving_dir.exists():
            logger.info(
                "Be careful you are changing the raw data which must be treated as immutable!"
            )

        # Clean the saving directory if it exists
        # ### è necessario farlo anche se non aggiorniamo il db? potremmo ridurre un po' i tempi
        if saving_dir.exists():
            logger.warning(f"Cleaning the content in {saving_dir}")
            sh.rmtree(saving_dir)

        # Create the saving directory
        saving_dir.mkdir(parents=True, exist_ok=False)
        self.databases[stage]["cif_path"] = pd.Series(dtype=str)

        # Save CIF files and update database paths
        for material in tqdm(
            materials_mp_query,
            desc="Saving perovskites",
            disable=mute_progress_bars,
        ):
            try:
                # Locate the row in the database corresponding to the material ID
                i_row = (
                    self.databases[stage]
                    .index[self.databases[stage]["material_id"] == material.material_id]
                    .tolist()[0]
                )

                # Define the CIF file path
                cif_path = saving_dir / f"{material.material_id}.cif"

                # Save the CIF file
                material.structure.to(filename=str(cif_path))

                # Update the database with the CIF file path
                self.databases[stage].at[i_row, "cif_path"] = str(cif_path)

            except IndexError:
                logger.error(f"Material ID {material.material_id} not found in the database.")
                raise MissingData(f"Material ID {material.material_id} not found in the database.")
            except Exception as e:
                logger.error(f"Failed to save CIF for Material ID {material.material_id}: {e}")
                raise OSError(
                    f"Failed to save CIF for Material ID {material.material_id}: {e}"
                ) from e

        # Save the updated database
        self.save_database(stage)
        logger.info(f"CIF files for stage '{stage}' saved and database updated successfully.")

    def copy_cif_files(
        self,
        stage: str,
        mute_progress_bars: bool = True,
    ) -> None:
        """
        Copy CIF files from the raw stage to another processing stage.

        Copies CIF files corresponding to the specified cathode state from the 'raw'
        processing stage to the target stage. Updates the database with the new file paths.

        Args:
            stage (str): Target processing stage ('processed', 'final').
            mute_progress_bars (bool, optional): Disable progress bar if True. Defaults to True.

        Raises:
            ValueError: If the target stage is 'raw'.
            MissingData: If the source CIF directory does not exist or is empty.
        """
        if stage == "raw":
            logger.error("Stage argument cannot be 'raw'.")
            logger.error("You can only copy from 'raw' to other stages, not to 'raw' itself.")
            raise ValueError("Stage argument cannot be 'raw'.")

        source_dir = self.database_directories["raw"]
        saving_dir = self.database_directories[stage]

        # Clean the saving directory if it exists
        if saving_dir.exists():
            logger.warning(f"Cleaning the content in {saving_dir}")
            sh.rmtree(saving_dir)

        # Check if source CIF directory exists and is not empty
        if not source_dir.exists() or not any(source_dir.iterdir()):
            logger.warning(
                f"The raw CIF directory does not exist or is empty. Check: {source_dir}"
            )
            raise MissingData(
                f"The raw CIF directory does not exist or is empty. Check: {source_dir}"
            )

        # Create the saving directory
        saving_dir.mkdir(parents=True, exist_ok=False)
        self.databases[stage] = pd.Series(dtype=str)

        # Copy CIF files and update database paths
        for material_id in tqdm(
            self.databases[stage],
            desc=f"Copying perovskites ('raw' -> '{stage}')",
            disable=mute_progress_bars,
        ):
            try:
                # Locate the row in the database corresponding to the material ID
                i_row = (
                    self.databases[stage]
                    .index[self.databases[stage]["material_id"] == material_id]
                    .tolist()[0]
                )

                # Define source and destination CIF file paths
                source_cif_path = source_dir / f"{material_id}.cif"
                cif_path = saving_dir / f"{material_id}.cif"

                # Copy the CIF file
                sh.copyfile(source_cif_path, cif_path)

                # Update the database with the new CIF file path
                self.databases[stage].at[i_row] = str(cif_path)

            except IndexError:
                logger.error(f"Material ID {material_id} not found in the database.")
                raise MissingData(f"Material ID {material_id} not found in the database.")
            except Exception as e:
                logger.error(f"Failed to copy CIF for Material ID {material_id}: {e}")
                raise OSError(f"Failed to copy CIF for Material ID {material_id}: {e}") from e

        # Save the updated database
        self.save_database(stage)
        logger.info(f"CIF files copied to stage '{stage}' and database updated successfully.")

    def __repr__(self) -> str:
        """
        Text representation of the PerovskiteDatabase instance.
        Used for print() and str() calls.

        Returns:
            str: ASCII table representation of the database
        """
        # Gather information about each stage
        data = {
            "Stage": [],
            "Entries": [],
            "Last Modified": [],
            "Size": [],
            "Storage Path": [],
        }

        # Calculate column widths
        widths = [10, 8, 17, 10, 55]

        for stage in self.processing_stages:
            # Get database info
            db = self.databases[stage]
            path = self.database_paths[stage]

            # Get file modification time and size if file exists
            if path.exists():
                modified = path.stat().st_mtime
                modified_time = pd.Timestamp.fromtimestamp(modified).strftime("%Y-%m-%d %H:%M")
                size = path.stat().st_size / 1024  # Convert to KB
                size_str = f"{size:.1f} KB" if size < 1024 else f"{size / 1024:.1f} MB"
            else:
                modified_time = "Not created"
                size_str = "0 KB"

            path_str = str(path.resolve())
            if len(path_str) > widths[4]:
                path_str = ".." + path_str[len(path_str) - widths[4] + 3 :]

            # Append data
            data["Stage"].append(stage.capitalize())
            data["Entries"].append(len(db))
            data["Last Modified"].append(modified_time)
            data["Size"].append(size_str)
            data["Storage Path"].append(path_str)

        # Create DataFrame
        info_df = pd.DataFrame(data)

        # Text representation for terminal/print
        def create_separator(widths):
            return "+" + "+".join("-" * (w + 1) for w in widths) + "+"

        # Create the text representation
        lines = []

        # Add title
        title = f" {self.__class__.__name__} Summary "
        lines.append(f"\n{title:=^{sum(widths) + len(widths) * 2 + 1}}")

        # Add header
        separator = create_separator(widths)
        lines.append(separator)

        header = (
            "|" + "|".join(f" {col:<{widths[i]}}" for i, col in enumerate(info_df.columns)) + "|"
        )
        lines.append(header)
        lines.append(separator)

        # Add data rows
        for _, row in info_df.iterrows():
            line = "|" + "|".join(f" {str(val):<{widths[i]}}" for i, val in enumerate(row)) + "|"
            lines.append(line)

        # Add bottom separator
        lines.append(separator)

        return "\n".join(lines)

    def _repr_html_(self) -> str:
        """
        HTML representation of the PerovskiteDatabase instance.
        Used for Jupyter notebook display.

        Returns:
            str: HTML representation of the database
        """
        # Gather information about each stage
        data = {
            "Stage": [],
            "Entries": [],
            "Last Modified": [],
            "Size": [],
            "Storage Path": [],
        }

        for stage in self.processing_stages:
            # Get database info
            db = self.databases[stage]
            path = self.database_paths[stage]

            # Get file modification time and size if file exists
            if path.exists():
                modified = path.stat().st_mtime
                modified_time = pd.Timestamp.fromtimestamp(modified).strftime("%Y-%m-%d %H:%M")
                size = path.stat().st_size / 1024  # Convert to KB
                size_str = f"{size:.1f} KB" if size < 1024 else f"{size / 1024:.1f} MB"
            else:
                modified_time = "Not created"
                size_str = "0 KB"

            # Append data
            data["Stage"].append(stage.capitalize())
            data["Entries"].append(len(db))
            data["Last Modified"].append(modified_time)
            data["Size"].append(size_str)
            data["Storage Path"].append(str(path.resolve()))

        # Create DataFrame
        info_df = pd.DataFrame(data)

        # Generate header row
        header_cells = " ".join(
            f'<th style="padding: 12px 15px; text-align: left;">{col}</th>'
            for col in info_df.columns
        )

        # Generate table rows
        table_rows = ""
        for _, row in info_df.iterrows():
            cells = "".join(f'<td style="padding: 12px 15px;">{val}</td>' for val in row)
            table_rows += f"<tr style='border-bottom: 1px solid #e9ecef;'>{cells}</tr>"

        # Create the complete HTML
        html = (
            """<style>
                @media (prefers-color-scheme: dark) {
                    .database-container { background-color: #1e1e1e !important; }
                    .database-title { color: #e0e0e0 !important; }
                    .database-table { background-color: #2d2d2d !important; }
                    .database-header { background-color: #4a4a4a !important; }
                    .database-cell { border-color: #404040 !important; }
                    .database-info { color: #b0b0b0 !important; }
                }
            </style>"""
            '<div style="font-family: Arial, sans-serif; padding: 20px; background:transparent; '
            'border-radius: 8px;">'
            f'<h3 style="color: #58bac7; margin-bottom: 15px;">{self.__class__.__name__}</h3>'
            '<div style="overflow-x: auto;">'
            '<table class="database-table" style="border-collapse: collapse; width: 100%;'
            ' box-shadow: 0 1px 3px rgba(0,0,0,0.1); background:transparent;">'
            # '<table style="border-collapse: collapse; width: 100%; background-color: white; '
            # 'box-shadow: 0 1px 3px rgba(0,0,0,0.1);">'
            "<thead>"
            f'<tr style="background-color: #58bac7; color: white;">{header_cells}</tr>'
            "</thead>"
            f"<tbody>{table_rows}</tbody>"
            "</table>"
            "</div>"
            '<div style="margin-top: 10px; color: #666; font-size: 1.1em;">'
            "</div>"
            "</div>"
        )
        return html

__init__(data_dir=DATA_DIR, external_perovproj_path=EXTERNAL_DATA_DIR / Path('perovskites') / Path('perovproject_db.json'))

Initialize the PerovskiteDatabase with a root data directory and processing stage.

Sets up the directory structure for storing data across different processing stages (raw/, processed/, final/) and initializes placeholders for database paths and data.

Parameters:

Name Type Description Default
data_dir Path

Root directory path for storing data. Defaults to DATA_DIR from config.

DATA_DIR

Raises:

Type Description
NotImplementedError

If the specified processing stage is not supported.

ImmutableRawDataError

If attempting to set an unsupported processing stage.

Source code in energy_gnome/dataset/perovskites.py
Python
def __init__(
    self,
    data_dir: Path | str = DATA_DIR,
    external_perovproj_path: Path | str = EXTERNAL_DATA_DIR
    / Path("perovskites")
    / Path("perovproject_db.json"),
):
    """
    Initialize the PerovskiteDatabase with a root data directory and processing stage.

    Sets up the directory structure for storing data across different processing stages
    (`raw/`, `processed/`, `final/`) and initializes placeholders for database paths and data.

    Args:
        data_dir (Path, optional): Root directory path for storing data.
                                   Defaults to DATA_DIR from config.

    Raises:
        NotImplementedError: If the specified processing stage is not supported.
        ImmutableRawDataError: If attempting to set an unsupported processing stage.
    """
    super().__init__(data_dir=data_dir)

    # Initialize directories, paths, and databases for each stage
    self.database_directories = {
        stage: self.data_dir / stage / "perovskites" for stage in self.processing_stages
    }
    for stage_dir in self.database_directories.values():
        stage_dir.mkdir(parents=True, exist_ok=True)

    self.database_paths = {
        stage: dir_path / "database.json"
        for stage, dir_path in self.database_directories.items()
    }

    self.databases = {stage: pd.DataFrame() for stage in self.processing_stages}
    self._perovskites = pd.DataFrame()
    self.external_perovproj_path: Path | str = external_perovproj_path

__repr__()

Text representation of the PerovskiteDatabase instance. Used for print() and str() calls.

Returns:

Name Type Description
str str

ASCII table representation of the database

Source code in energy_gnome/dataset/perovskites.py
Python
def __repr__(self) -> str:
    """
    Text representation of the PerovskiteDatabase instance.
    Used for print() and str() calls.

    Returns:
        str: ASCII table representation of the database
    """
    # Gather information about each stage
    data = {
        "Stage": [],
        "Entries": [],
        "Last Modified": [],
        "Size": [],
        "Storage Path": [],
    }

    # Calculate column widths
    widths = [10, 8, 17, 10, 55]

    for stage in self.processing_stages:
        # Get database info
        db = self.databases[stage]
        path = self.database_paths[stage]

        # Get file modification time and size if file exists
        if path.exists():
            modified = path.stat().st_mtime
            modified_time = pd.Timestamp.fromtimestamp(modified).strftime("%Y-%m-%d %H:%M")
            size = path.stat().st_size / 1024  # Convert to KB
            size_str = f"{size:.1f} KB" if size < 1024 else f"{size / 1024:.1f} MB"
        else:
            modified_time = "Not created"
            size_str = "0 KB"

        path_str = str(path.resolve())
        if len(path_str) > widths[4]:
            path_str = ".." + path_str[len(path_str) - widths[4] + 3 :]

        # Append data
        data["Stage"].append(stage.capitalize())
        data["Entries"].append(len(db))
        data["Last Modified"].append(modified_time)
        data["Size"].append(size_str)
        data["Storage Path"].append(path_str)

    # Create DataFrame
    info_df = pd.DataFrame(data)

    # Text representation for terminal/print
    def create_separator(widths):
        return "+" + "+".join("-" * (w + 1) for w in widths) + "+"

    # Create the text representation
    lines = []

    # Add title
    title = f" {self.__class__.__name__} Summary "
    lines.append(f"\n{title:=^{sum(widths) + len(widths) * 2 + 1}}")

    # Add header
    separator = create_separator(widths)
    lines.append(separator)

    header = (
        "|" + "|".join(f" {col:<{widths[i]}}" for i, col in enumerate(info_df.columns)) + "|"
    )
    lines.append(header)
    lines.append(separator)

    # Add data rows
    for _, row in info_df.iterrows():
        line = "|" + "|".join(f" {str(val):<{widths[i]}}" for i, val in enumerate(row)) + "|"
        lines.append(line)

    # Add bottom separator
    lines.append(separator)

    return "\n".join(lines)

add_material_properties(stage, materials_mp_query, mute_progress_bars=True)

Add material properties to the database from Material Project query results.

Saves CIF files for each material in the query and updates the database with file paths and properties.

Parameters:

Name Type Description Default
stage str

Processing stage ('raw', 'processed', 'final').

required
materials_mp_query List[Any]

List of material query results.

required
mute_progress_bars bool

Disable progress bar if True. Defaults to True.

True

Returns:

Type Description
DataFrame

pd.DataFrame: Updated database with material properties.

Raises:

Type Description
ImmutableRawDataError

If attempting to modify immutable raw data.

KeyError

If a material ID is not found in the database.

Source code in energy_gnome/dataset/perovskites.py
Python
def add_material_properties(
    self,
    stage: str,
    materials_mp_query: list,
    mute_progress_bars: bool = True,
) -> pd.DataFrame:
    """
    Add material properties to the database from Material Project query results.

    Saves CIF files for each material in the query and updates the database with file paths and properties.

    Args:
        stage (str): Processing stage ('raw', 'processed', 'final').
        materials_mp_query (List[Any]): List of material query results.
        mute_progress_bars (bool, optional): Disable progress bar if True. Defaults to True.

    Returns:
        pd.DataFrame: Updated database with material properties.

    Raises:
        ImmutableRawDataError: If attempting to modify immutable raw data.
        KeyError: If a material ID is not found in the database.
    """
    pass

backup_and_changelog(old_db, new_db, differences, stage)

Backup the old database and update the changelog with identified differences.

Creates a backup of the existing database and appends a changelog entry detailing the differences between the old and new databases. The changelog includes information such as entry identifiers, formulas, and last updated timestamps.

Parameters:

Name Type Description Default
old_db DataFrame

The existing database before updates.

required
new_db DataFrame

The new database containing updates.

required
differences Series

Series of identifiers that are new or updated.

required
stage str

The processing stage ('raw', 'processed', 'final').

required
Source code in energy_gnome/dataset/perovskites.py
Python
def backup_and_changelog(
    self,
    old_db: pd.DataFrame,
    new_db: pd.DataFrame,
    differences: pd.Series,
    stage: str,
) -> None:
    """
    Backup the old database and update the changelog with identified differences.

    Creates a backup of the existing database and appends a changelog entry detailing
    the differences between the old and new databases. The changelog includes
    information such as entry identifiers, formulas, and last updated timestamps.

    Args:
        old_db (pd.DataFrame): The existing database before updates.
        new_db (pd.DataFrame): The new database containing updates.
        differences (pd.Series): Series of identifiers that are new or updated.
        stage (str): The processing stage ('raw', 'processed', 'final').
    """
    if stage not in self.processing_stages:
        logger.error(f"Invalid stage: {stage}. Must be one of {self.processing_stages}.")
        raise ValueError(f"stage must be one of {self.processing_stages}.")

    backup_path = self.database_directories[stage] / "old_database.json"
    try:
        old_db.to_json(backup_path)
        logger.debug(f"Old database backed up to {backup_path}")
    except Exception as e:
        logger.error(f"Failed to backup old database to {backup_path}: {e}")
        raise OSError(f"Failed to backup old database to {backup_path}: {e}") from e

    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    changelog_path = self.database_directories[stage] / "changelog.txt"
    changelog_entries = [
        f"= Change Log - {timestamp} ".ljust(70, "=") + "\n",
        "Difference old_database.json VS database.json\n",
        f"{'ID':<15}{'Formula':<30}{'Last Updated (MP)':<25}\n",
        "-" * 70 + "\n",
    ]
    # Tailoring respect father class
    for identifier in differences["material_id"]:
        row = new_db.loc[new_db["material_id"] == identifier]
        if not row.empty:
            formula = row["formula_pretty"].values[0]
            last_updated = row["last_updated"].values[0]
        else:
            formula = "N/A"
            last_updated = "N/A"
        changelog_entries.append(f"{identifier:<15}{formula:<30}{last_updated:<20}\n")

    try:
        with open(changelog_path, "a") as file:
            file.writelines(changelog_entries)
        logger.debug(f"Changelog updated at {changelog_path} with {len(differences)} changes.")
    except Exception as e:
        logger.error(f"Failed to update changelog at {changelog_path}: {e}")
        raise OSError(f"Failed to update changelog at {changelog_path}: {e}") from e

compare_and_update(new_db, stage)

Compare and update the database with new entries.

Identifies new entries and updates the database accordingly. Ensures that raw data remains immutable by preventing updates unless explicitly allowed.

Parameters:

Name Type Description Default
new_db DataFrame

New database to compare.

required
stage str

Processing stage ("raw", "processed", "final").

required

Returns:

Type Description
DataFrame

pd.DataFrame: Updated database containing new entries.

Raises:

Type Description
ImmutableRawDataError

If attempting to modify immutable raw data.

Source code in energy_gnome/dataset/perovskites.py
Python
def compare_and_update(self, new_db: pd.DataFrame, stage: str) -> pd.DataFrame:
    """
    Compare and update the database with new entries.

    Identifies new entries and updates the database accordingly. Ensures that raw data
    remains immutable by preventing updates unless explicitly allowed.

    Args:
        new_db (pd.DataFrame): New database to compare.
        stage (str): Processing stage ("raw", "processed", "final").

    Returns:
        pd.DataFrame: Updated database containing new entries.

    Raises:
        ImmutableRawDataError: If attempting to modify immutable raw data.
    """
    old_db = self.load_database(stage=stage)
    db_diff = self.compare_databases(new_db, stage)
    if not db_diff.empty:
        logger.warning(f"The new database contains {len(db_diff)} new items.")

        if stage == "raw" and not self._update_raw:
            logger.error("Raw data must be treated as immutable!")
            logger.error(
                "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
            )
            raise ImmutableRawDataError(
                "Raw data must be treated as immutable!\n"
                "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
            )
        else:
            if stage == "raw":
                logger.info(
                    "Be careful you are changing the raw data which must be treated as immutable!"
                )
            logger.info(
                f"Updating the {stage} data and saving it in {self.database_paths[stage]}."
            )
            self.backup_and_changelog(
                old_db,
                new_db,
                db_diff,
                stage,
            )
            self.databases[stage] = new_db
            self.save_database(stage)
    else:
        logger.info("No new items found. No update required.")

compare_databases(new_db, stage)

Compare two databases and identify new entry IDs.

Parameters:

Name Type Description Default
new_db DataFrame

New database to compare.

required
stage str

Processing stage ("raw", "processed", "final").

required

Returns:

Type Description
DataFrame

pd.DataFrame: Subset of new_db containing only new entry IDs.

Source code in energy_gnome/dataset/perovskites.py
Python
def compare_databases(self, new_db: pd.DataFrame, stage: str) -> pd.DataFrame:
    """
    Compare two databases and identify new entry IDs.

    Args:
        new_db (pd.DataFrame): New database to compare.
        stage (str): Processing stage ("raw", "processed", "final").

    Returns:
        pd.DataFrame: Subset of `new_db` containing only new entry IDs.
    """
    old_db = self.load_database(stage=stage)
    if not old_db.empty:
        new_ids_set = set(new_db["material_id"])
        old_ids_set = set(old_db["material_id"])
        new_ids_only = new_ids_set - old_ids_set
        logger.debug(f"Found {len(new_ids_only)} new perovskite IDs in the new database.")
        return new_db[new_db["material_id"].isin(new_ids_only)]
    else:
        logger.warning("Nothing to compare here...")
        return new_db

copy_cif_files(stage, mute_progress_bars=True)

Copy CIF files from the raw stage to another processing stage.

Copies CIF files corresponding to the specified cathode state from the 'raw' processing stage to the target stage. Updates the database with the new file paths.

Parameters:

Name Type Description Default
stage str

Target processing stage ('processed', 'final').

required
mute_progress_bars bool

Disable progress bar if True. Defaults to True.

True

Raises:

Type Description
ValueError

If the target stage is 'raw'.

MissingData

If the source CIF directory does not exist or is empty.

Source code in energy_gnome/dataset/perovskites.py
Python
def copy_cif_files(
    self,
    stage: str,
    mute_progress_bars: bool = True,
) -> None:
    """
    Copy CIF files from the raw stage to another processing stage.

    Copies CIF files corresponding to the specified cathode state from the 'raw'
    processing stage to the target stage. Updates the database with the new file paths.

    Args:
        stage (str): Target processing stage ('processed', 'final').
        mute_progress_bars (bool, optional): Disable progress bar if True. Defaults to True.

    Raises:
        ValueError: If the target stage is 'raw'.
        MissingData: If the source CIF directory does not exist or is empty.
    """
    if stage == "raw":
        logger.error("Stage argument cannot be 'raw'.")
        logger.error("You can only copy from 'raw' to other stages, not to 'raw' itself.")
        raise ValueError("Stage argument cannot be 'raw'.")

    source_dir = self.database_directories["raw"]
    saving_dir = self.database_directories[stage]

    # Clean the saving directory if it exists
    if saving_dir.exists():
        logger.warning(f"Cleaning the content in {saving_dir}")
        sh.rmtree(saving_dir)

    # Check if source CIF directory exists and is not empty
    if not source_dir.exists() or not any(source_dir.iterdir()):
        logger.warning(
            f"The raw CIF directory does not exist or is empty. Check: {source_dir}"
        )
        raise MissingData(
            f"The raw CIF directory does not exist or is empty. Check: {source_dir}"
        )

    # Create the saving directory
    saving_dir.mkdir(parents=True, exist_ok=False)
    self.databases[stage] = pd.Series(dtype=str)

    # Copy CIF files and update database paths
    for material_id in tqdm(
        self.databases[stage],
        desc=f"Copying perovskites ('raw' -> '{stage}')",
        disable=mute_progress_bars,
    ):
        try:
            # Locate the row in the database corresponding to the material ID
            i_row = (
                self.databases[stage]
                .index[self.databases[stage]["material_id"] == material_id]
                .tolist()[0]
            )

            # Define source and destination CIF file paths
            source_cif_path = source_dir / f"{material_id}.cif"
            cif_path = saving_dir / f"{material_id}.cif"

            # Copy the CIF file
            sh.copyfile(source_cif_path, cif_path)

            # Update the database with the new CIF file path
            self.databases[stage].at[i_row] = str(cif_path)

        except IndexError:
            logger.error(f"Material ID {material_id} not found in the database.")
            raise MissingData(f"Material ID {material_id} not found in the database.")
        except Exception as e:
            logger.error(f"Failed to copy CIF for Material ID {material_id}: {e}")
            raise OSError(f"Failed to copy CIF for Material ID {material_id}: {e}") from e

    # Save the updated database
    self.save_database(stage)
    logger.info(f"CIF files copied to stage '{stage}' and database updated successfully.")

retrieve_materials(mute_progress_bars=True)

Retrieve perovskites from the Materials Project API.

Connects to the Material Project API using MPRester, queries for materials, and retrieves the specified fields. Cleans the data by removing entries with missing critical identifiers.

Parameters:

Name Type Description Default
mute_progress_bars bool

If True, mutes the Material Project API progress bars. Defaults to True.

True

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing the retrieved and cleaned models.

Raises:

Type Description
Exception

If the API query fails.

Source code in energy_gnome/dataset/perovskites.py
Python
def retrieve_materials(self, mute_progress_bars: bool = True) -> pd.DataFrame:
    """
    Retrieve perovskites from the Materials Project API.

    Connects to the Material Project API using MPRester, queries for materials, and retrieves the specified fields.
    Cleans the data by removing entries with missing critical identifiers.

    Args:
        mute_progress_bars (bool, optional):
            If `True`, mutes the Material Project API progress bars.
            Defaults to `True`.

    Returns:
        pd.DataFrame: DataFrame containing the retrieved and cleaned models.

    Raises:
        Exception: If the API query fails.
    """
    mp_api_key = get_mp_api_key()
    ids_list_robo = self._pre_retrieve_robo(mute_progress_bars=mute_progress_bars)
    ids_list_perovproj = self._pre_retrieve_perovproj(mute_progress_bars=mute_progress_bars)
    logger.debug("MP querying for perovskites.")

    ids_list = ids_list_robo + ids_list_perovproj
    unique_ids = list()
    for x in ids_list:
        if x not in unique_ids:
            unique_ids.append(x)

    with MPRester(mp_api_key, mute_progress_bars=mute_progress_bars) as mpr:
        try:
            query = mpr.materials.summary.search(
                material_ids=unique_ids, fields=MAT_PROPERTIES
            )
            logger.info(
                f"MP query successful, {len(query)} perovskites found through Robocrystallographer and Perovskite Project formulae."
            )
        except Exception as e:
            raise e
    logger.debug("Converting MP query results into DataFrame.")
    perovskites_database = convert_my_query_to_dataframe_perovskites(
        query, mute_progress_bars=mute_progress_bars
    )

    query_ids = list()
    for m in query:
        query_ids.append(m.material_id)

    # Fast cleaning
    logger.debug("Removing NaN (rows)")
    logger.debug(f"size DB before = {len(perovskites_database)}")
    perovskites_database = perovskites_database.dropna(
        axis=0, how="any", subset=BAND_CRITICAL_FIELD
    )
    logger.debug(f"size DB after = {len(perovskites_database)}")
    logger.debug("Removing NaN (cols)")
    logger.debug(f"size DB before = {len(perovskites_database)}")
    perovskites_database = perovskites_database.dropna(axis=1, how="all")
    logger.debug(f"size DB after = {len(perovskites_database)}")

    # Filtering
    logger.debug("Removing metallic perovskites.")
    logger.debug(f"size DB before = {len(perovskites_database)}")
    filtered_perov_database = perovskites_database[~(perovskites_database["is_metal"])]
    logger.debug(f"size DB after = {len(filtered_perov_database)}")

    query_ids_filtered = filtered_perov_database["material_id"]
    diff = set(query_ids) - set(query_ids_filtered)

    reach_end = False
    while not reach_end:
        for i, q in enumerate(query):
            if q.material_id in diff:
                query.pop(i)
                break
        if i == len(query) - 1:
            reach_end = True

    self._perovskites = filtered_perov_database.copy()

    logger.success("Perovskites retrieved successfully.")
    return self._perovskites, query

retrieve_remote(mute_progress_bars=True)

Retrieve materials from the Material Project API.

Wrapper method to call retrieve_materials.

Parameters:

Name Type Description Default
mute_progress_bars bool

If True, mutes the Material Project API progress bars. Defaults to True.

True

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing the retrieved materials.

Source code in energy_gnome/dataset/perovskites.py
Python
def retrieve_remote(self, mute_progress_bars: bool = True) -> pd.DataFrame:
    """
    Retrieve materials from the Material Project API.

    Wrapper method to call `retrieve_materials`.

    Args:
        mute_progress_bars (bool, optional):
            If `True`, mutes the Material Project API progress bars.
            Defaults to `True`.

    Returns:
        pd.DataFrame: DataFrame containing the retrieved materials.
    """
    return self.retrieve_materials(mute_progress_bars=mute_progress_bars)

save_cif_files(stage, materials_mp_query, mute_progress_bars=True)

Save CIF files for materials and update the database accordingly.

Manages the saving of CIF files for each material and updates the database with the file paths and relevant properties. Ensures that raw data remains immutable.

Parameters:

Name Type Description Default
stage str

Processing stage ('raw', 'processed', 'final').

required
materials_mp_query List[Any]

List of material query results.

required
mute_progress_bars bool

Disable progress bar if True. Defaults to True.

True

Raises:

Type Description
ImmutableRawDataError

If attempting to modify immutable raw data.

Source code in energy_gnome/dataset/perovskites.py
Python
def save_cif_files(
    self,
    stage: str,
    materials_mp_query: list,
    mute_progress_bars: bool = True,
) -> None:
    """
    Save CIF files for materials and update the database accordingly.

    Manages the saving of CIF files for each material and updates the database with
    the file paths and relevant properties. Ensures that raw data remains immutable.

    Args:
        stage (str): Processing stage ('raw', 'processed', 'final').
        materials_mp_query (List[Any]): List of material query results.
        mute_progress_bars (bool, optional): Disable progress bar if True. Defaults to True.

    Raises:
        ImmutableRawDataError: If attempting to modify immutable raw data.
    """

    saving_dir = self.database_directories[stage] / "structures/"

    if stage == "raw" and not self._update_raw:
        logger.error("Raw data must be treated as immutable!")
        logger.error(
            "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
        )
        raise ImmutableRawDataError(
            "Raw data must be treated as immutable!\n"
            "It's okay to read and copy raw data to manipulate it into new outputs, but never okay to change it in place."
        )
    elif stage == "raw" and saving_dir.exists():
        logger.info(
            "Be careful you are changing the raw data which must be treated as immutable!"
        )

    # Clean the saving directory if it exists
    # ### è necessario farlo anche se non aggiorniamo il db? potremmo ridurre un po' i tempi
    if saving_dir.exists():
        logger.warning(f"Cleaning the content in {saving_dir}")
        sh.rmtree(saving_dir)

    # Create the saving directory
    saving_dir.mkdir(parents=True, exist_ok=False)
    self.databases[stage]["cif_path"] = pd.Series(dtype=str)

    # Save CIF files and update database paths
    for material in tqdm(
        materials_mp_query,
        desc="Saving perovskites",
        disable=mute_progress_bars,
    ):
        try:
            # Locate the row in the database corresponding to the material ID
            i_row = (
                self.databases[stage]
                .index[self.databases[stage]["material_id"] == material.material_id]
                .tolist()[0]
            )

            # Define the CIF file path
            cif_path = saving_dir / f"{material.material_id}.cif"

            # Save the CIF file
            material.structure.to(filename=str(cif_path))

            # Update the database with the CIF file path
            self.databases[stage].at[i_row, "cif_path"] = str(cif_path)

        except IndexError:
            logger.error(f"Material ID {material.material_id} not found in the database.")
            raise MissingData(f"Material ID {material.material_id} not found in the database.")
        except Exception as e:
            logger.error(f"Failed to save CIF for Material ID {material.material_id}: {e}")
            raise OSError(
                f"Failed to save CIF for Material ID {material.material_id}: {e}"
            ) from e

    # Save the updated database
    self.save_database(stage)
    logger.info(f"CIF files for stage '{stage}' saved and database updated successfully.")

energy_gnome.dataset