Script makes zipped files get bigger and bigger using zipfile module, stops when terminal is killed

45 Views Asked by At

Designed a Tkinter GUI app the main goal is zipping files that matches a certain pattern. It zipps the desired files together and every action is as intended. The only problem is the first files that has been zipped getting bigger and bigger by size on disk and the actual size follows if I wont terminate the terminal it goes up to 120gb or more.(Original files are 90mb). You can think the files as batches as follows:

  • AAAA.src
  • AAAA.txt
  • AAAA.log
  • BBBB.src
  • BBBB.log
  • So the first zipped file would be AAAA.zip and this one becomes problematic. Here's a snippet from the code. Any help would be appreciated.
def copy_move_files(self, file_paths, action):
    third_input = self.get_third_input()

    for file_path in file_paths:
        if action == "copy":
            # .....
        elif action == "move":
            # .....
        elif action == "just_zip":
            file_paths_to_zip = []
            file_paths_to_delete = []
            source_with_third = os.path.join(self.source_folder, third_input)

            pattern = r'\d{8}_\d{6}_\d{14}'

            for root, _, files in os.walk(source_with_third):

                for file in files:
                    file_name = os.path.basename(file)
                    if re.match(pattern, file_name):
                        file_path = os.path.join(root, file)
                        file_paths_to_zip.append(file_path)
                        file_paths_to_delete.append(file_path)
                    else:
                        file_path = os.path.join(root, file)
                        file_paths_to_delete.append(file_path)

            for file_path in file_paths_to_zip:
                try:
                    prefix = os.path.basename(file_path)[:30]
                    zip_path = os.path.join(os.path.dirname(file_path), f'{prefix}.zip')

                    with zipfile.ZipFile(zip_path, 'a') as zipf:
                        arcname = os.path.basename(file_path)
                        zipf.write(file_path, arcname)

                    self.logger.info("File %s zipped successfully", os.path.basename(file_path))
                except Exception as e:
                    # Log the error message if an exception occurs during zipping
                    self.logger.error("Error zipping file %s: %s", os.path.basename(file_path), str(e))

            for file_path in file_paths_to_delete:
                if not file_path.endswith(".zip"):
                    os.remove(file_path)
                    self.logger.info("File %s deleted successfully", os.path.basename(file_path))

            self.logger.info("Files zipped successfully on %s", self.source_folder)


def just_zip(self):
    try:
        self.source_folder = self.source_entry.get()
        self.dest_folder = self.dest_entry.get()
        third_input = self.third_entry.get()
        last_copy_time = datetime.now().strftime("%Y-%m-%d \n %H:%M:%S")
        self.logger.info("Files will be zipped with the Chamber Name: %s", third_input)

        with open("guide.txt", "r+") as file:
            lines = file.readlines()
            lines[0] = self.source_folder + "\n"
            lines[1] = self.dest_folder + "\n"
            lines[2] = self.third_entry.get() + "\n"
            lines[3] = last_copy_time + "\n"
            file.seek(0)
            file.writelines(lines)
            file.truncate()

        with open("guide.txt", "r+") as file:
            file_lines = file.readlines()
            if len(file_lines) > 5:
                file.seek(0)
                file.writelines(file_lines[:5])
                file.truncate()

        if not self.source_folder:
            print("Please select a source.")
            self.logger.warning("No Source folder selected.")
            return

        file_paths = [os.path.join(self.source_folder, file) for file in os.listdir(self.source_folder)]
        try:
            threading.Thread(target=self.copy_move_files, args=(file_paths, "just_zip")).start()
        except Exception as e:
            logging.error("An error occurred during thread creation for zipping files: %s", str(e))
    except Exception as e:
        logging.error("An error occurred during zipping files: %s", str(e))

I tried the with block with only write mode but outcome was no zipped files. Tried zipf.close(). Started to get suspicious from threads.

Thank You!

1

There are 1 best solutions below

0
Icerewl On

I do not know what was the problem but for the people who might have a similar bug I wrote a completely different zip function from scratch and this works fine.

        elif action == "just_zip":
        source_with_third = os.path.join(self.source_folder, third_input)

        all_files = []
        for root, _, files in os.walk(source_with_third):
            all_files.extend([os.path.join(root, file) for file in files])

        if not all_files:
            self.logger.error("Source folder looks empty. Check if the source path is correct")
            self.update_status("Source folder looks empty. Check if the source path is correct", is_error=True)
        else:

            for file_path in all_files:
                filename = os.path.basename(file_path)
                if not re.match(pattern, filename[:30]):
                    send2trash(file_path.replace("/","\\"))

            zip_files_exist = any(file.endswith(".zip") for file in all_files)
            if zip_files_exist:
                self.logger.error("ERROR, FILES ARE ALREADY ZIPPED YOU CAN TRY TO MOVE OR COPY THEM")
                self.update_status("Files are already zipped you can try to move or copy them", is_error=True)
            else:
                grouped_files = {}
                for file_path in all_files:
                    base_name = os.path.basename(file_path)
                    prefix = base_name[:30]
                    if prefix not in grouped_files:
                        grouped_files[prefix] = []
                    grouped_files[prefix].append(file_path)

                total_files = sum(len(files) for files in grouped_files.values())
                processed_files = 0

                for prefix, files in grouped_files.items():
                    if len(files) > 1:
                        zip_path = os.path.dirname(files[0])
                        zip_filename = os.path.join(zip_path, f"{prefix}.zip")
                        try:
                            with zipfile.ZipFile(zip_filename, "w") as zipf:
                                for file in files:
                                    try:
                                        zipf.write(file, os.path.relpath(file, zip_path))
                                    except Exception as e:
                                        self.logger.error(f"Error zipping {file}: {e}")
                                        self.update_status(f"Error zipping {file}", is_error=True)
                            self.logger.info(f"Zipped {len(files)} files with prefix '{prefix}'.")

                            for file in files:
                                send2trash(file.replace("/","\\"))
                                self.logger.info(f"Deleted {file}.")
                        except Exception as e:
                            self.logger.error(f"Error creating zip for {prefix}: {e}")
                            self.update_status(f"Error creating zip for {prefix}", is_error=True)
                    else:
                        pass
                    processed_files += len(files)
                    progress = int((processed_files / total_files) * 100)
                    self.progressbar["value"] = progress  # Update the progress bar directly
                    self.root.update_idletasks()
                
                self.mirrored_clock_label.config(text="Last Zip Time:\n" + datetime.now().strftime("%Y-%m-%d \n %H:%M:%S"))
                self.logger.info("Files zipped successfully on %s", self.source_folder)
                self.update_status("Files have been zipped SUCCESSFULLY", is_error=False)