#!/usr/bin/env python3 import os import sys import mimetypes import subprocess import shutil import math import json import argparse import ffmpeg import pydbus as dbus import openpyxl from openpyxl.styles import PatternFill from openpyxl.formatting.rule import CellIsRule def inhibit_arret(application, raison): inhibiteurs = {} # Inhibition arrêt et veille via login1 (SystemBus) system_bus = dbus.SystemBus() login1 = system_bus.get("org.freedesktop.login1", "/org/freedesktop/login1") inhibiteurs["shutdown"] = login1.Inhibit( "shutdown", application, raison, "block" ) inhibiteurs["sleep"] = login1.Inhibit( "sleep", application, raison, "block" ) # Inhibition déconnexion via gnome-session (SessionBus) session_bus = dbus.SessionBus() session_manager = session_bus.get("org.gnome.SessionManager", "/org/gnome/SessionManager") # flags: # 0 = logout only # 1 = user switch # 2 = suspend # 4 = shutdown # 8 = idle (screensaver, etc) flags = 0 | 1 | 2 | 4 | 8 inhibiteurs["session"] = session_manager.Inhibit( application, 0, raison, flags ) return inhibiteurs def lever_inhibit_arret(inhibiteurs): # Fermer le descripteur de fichier pour lever l'inhibition for cle, inhibiteur in inhibiteurs.items(): if cle in ["shutdown", "sleep"] and hasattr(inhibiteur, "close"): inhibiteur.close() elif cle == "session": # Connexion au bus session session_bus = dbus.SessionBus() # Obtenir l'interface SessionManager de gnome session_manager = session_bus.get("org.gnome.SessionManager", "/org/gnome/SessionManager") session_manager.Uninhibit(inhibiteur) def relancer_avec_systemd(): # Constitue la commande avec systemd-run cmd = [ "systemd-run", "--unit=handbrake_recursive", "--user", "--description=Encodage vidéo", f'--working-directory={os.getcwd()}', "--", sys.argv[0] ] # Ajoute les arguments sauf --service et --pager (journalctl utilise déjà un pager) cmd += [arg for arg in sys.argv[1:] if arg not in ("--service", "--pager")] print(f"Lancement du script via systemd:\n{' '.join(cmd)}") print(f"Pour suivre l'exécution du programme:\n journalctl --user -f -u handbrake_recursive") # Remplace le processus courant os.execvp(cmd[0], cmd) # Définir la largeur de chaque colonne à environ 3 cm (approximativement 10.5 unités) def set_column_width(ws, column_range, width=15): for col in column_range: ws.column_dimensions[col].width = width def export_xlsx(lignes, fichier_sortie="résumé.xlsx"): # Créer un nouveau classeur et sélectionner la feuille active wb = openpyxl.Workbook() ws = wb.active ws.title = "Résumé" # En-têtes headers = ["Fichier", "Codec", "Résolution", "Action", "Durée", "Taille SRC", "Bitrate SRC", "Taille DES", "Bitrate DES", "Gain %", "Résultat"] ws.append(headers) ws.auto_filter.ref = "A1:K1" ws.freeze_panes = "B2" # Appliquer à toutes les colonnes set_column_width(ws, ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K']) # Styles pour les cellules styles = { "Réussi": PatternFill(start_color="d4f5dd", end_color="d4f5dd", fill_type="solid"), "Erreur": PatternFill(start_color="f8d7da", end_color="f8d7da", fill_type="solid"), "Interrompu": PatternFill(start_color="fdd692", end_color="fdd692", fill_type="solid"), "Existant": PatternFill(start_color="dbeff7", end_color="dbeff7", fill_type="solid"), "Simulé": PatternFill(start_color="d1e7ff", end_color="d1e7ff", fill_type="solid"), "Réencodé": PatternFill(start_color="e9d5e5", end_color="e9d5e5", fill_type="solid"), "Copié": PatternFill(start_color="fff3cd", end_color="fff3cd", fill_type="solid"), "Gain > 50%": PatternFill(start_color="d4f5dd", end_color="d4f5dd", fill_type="solid"), "Gain > 30%": PatternFill(start_color="fff3cd", end_color="fff3cd", fill_type="solid"), "Gain < 30%": PatternFill(start_color="f8d7da", end_color="f8d7da", fill_type="solid"), "Taille <": PatternFill(start_color="d4f5dd", end_color="d4f5dd", fill_type="solid"), "Taille =": PatternFill(start_color="fff3cd", end_color="fff3cd", fill_type="solid"), "Taille >": PatternFill(start_color="f8d7da", end_color="f8d7da", fill_type="solid"), } # Données for ligne in lignes: file, codec, resolution, action, duration, input_size, output_size, input_bitrate, output_bitrate, result = ligne gain_percent = calc_gain_percent(input_size, output_size, raw=True) row = [ file, # string codec, # string resolution, # string action, # string duration / 86400 if isinstance(duration, (float, int)) else duration, # float fraction de jour input_size / 1024 ** 2, # int (octets) input_bitrate, # int output_size / 1024 ** 2, # int output_bitrate, # int gain_percent, # float (vrai pourcentage) result # string ] ws.append(row) nb_lignes = ws.max_row for row in ws.iter_rows(min_row=2, max_col=len(headers)): # skip header row[4].number_format = '[h]"h"mm"m"ss"s"' # Excel-style format row[5].number_format = '#,##0 "Mo"' # Taille SRC row[6].number_format = '#,##0 "kb/s"' # Bitrate SRC row[7].number_format = '#,##0 "Mo"' # Taille DES row[8].number_format = '#,##0 "kb/s"' # Bitrate DES row[9].number_format = '0.0%' # Gain % # Appliquer couleurs sur colonne Action (colonne D = index 3) for row in ws.iter_rows(min_row=2, min_col=4, max_col=4, max_row=ws.max_row): cell = row[0] action = cell.value if action in styles: for col_index in [0, 1, 2, 3, 4]: # Colonnes Fichier, Codec, Résolution, Action, Durée ws.cell(row=cell.row, column=col_index + 1).fill = styles[action] # Appliquer couleurs sur colonne Résultat (colonne K = index 10) for row in ws.iter_rows(min_row=2, min_col=11, max_col=11, max_row=ws.max_row): cell = row[0] resultat = cell.value if resultat in styles: cell.fill = styles[resultat] # Ajouter la mise en forme conditionnelle # Taille DES ws.conditional_formatting.add(f'H2:H{nb_lignes}', CellIsRule(operator='lessThan', formula=[f'F2:F{nb_lignes}'], stopIfTrue=True, fill=styles["Taille <"])) ws.conditional_formatting.add(f'H2:H{nb_lignes}', CellIsRule(operator='equal', formula=[f'F2:F{nb_lignes}'], stopIfTrue=True, fill=styles["Taille ="])) ws.conditional_formatting.add(f'H2:H{nb_lignes}', CellIsRule(operator='greaterThan', formula=[f'F2:F{nb_lignes}'], stopIfTrue=True, fill=styles["Taille >"])) # Gain % ws.conditional_formatting.add(f'J2:J{nb_lignes}', CellIsRule(operator='greaterThan', formula=['50%'], stopIfTrue=True, fill=styles["Gain > 50%"])) ws.conditional_formatting.add(f'J2:J{nb_lignes}', CellIsRule(operator='greaterThan', formula=['30%'], stopIfTrue=True, fill=styles["Gain > 30%"])) ws.conditional_formatting.add(f'J2:J{nb_lignes}', CellIsRule(operator='lessThan', formula=['30%'], stopIfTrue=True, fill=styles["Gain < 30%"])) # Sauvegarder le fichier wb.save(fichier_sortie) def human_readable_size(size_bytes): """Convertit une taille en octets en une taille lisible (Ko, Mo, Go).""" if size_bytes == 0: return "0 B" size_name = ("B", "KB", "MB", "GB", "TB") i = int(math.floor(math.log(size_bytes, 1024))) p = math.pow(1024, i) s = round(size_bytes / p, 2) return f"{s} {size_name[i]}" def format_duration(seconds): """Formate la durée en h/m/s lisible""" if not isinstance(seconds, (float, int)) or seconds <= 0: return "" else: return f"{int(seconds // 3600)}h" * (seconds >= 3600) + f"{int((seconds % 3600) // 60)}m" * (seconds >= 60) + f"{int(seconds % 60)}s" def calc_ratio(size, duration): """Calcule le bitrate approximatif (octets/sec).""" return str(int(size / duration)) if duration else "" def calc_gain_percent(input_size, output_size, raw=False): if input_size > 0 and output_size > 0: value = (input_size - output_size) / input_size return value if raw else value * 100 return 0 def check_file_exists(file_path): """Vérifie si un fichier existe.""" if not os.path.exists(file_path): print(f"\033[91mErreur : Le fichier {file_path} n'existe pas.\033[0m") sys.exit(1) def check_directory_existence(directory_path, create_if_missing=False): """Vérifie si un dossier existe et le crée si nécessaire.""" if not os.path.exists(directory_path): if create_if_missing: try: os.makedirs(directory_path) print(f"Dossier créé : {directory_path}") except OSError as e: print(f"\033[91mErreur : Impossible de créer le dossier {directory_path}. {e}\033[0m") sys.exit(1) else: print(f"\033[91mErreur : Le dossier {directory_path} n'existe pas.\033[0m") sys.exit(1) def check_directory_writable(directory_path): """Vérifie si un dossier est accessible en écriture.""" if not os.access(directory_path, os.W_OK): print(f"\033[91mErreur : Le dossier {directory_path} n'est pas accessible en écriture.\033[0m") sys.exit(1) def get_video_duration(file_path): """Retourne la durée d'une vidéo en secondes.""" try: probe = ffmpeg.probe(file_path) return float(probe['format']['duration']) except Exception as e: print(f"\033[91mErreur lors de la récupération de la durée pour {file_path}: {e}\033[0m") return 0.0 def get_video_codec(file_path): """Retourne le codec vidéo d'un fichier avec ffmpeg.""" try: probe = ffmpeg.probe(file_path) video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) if video_stream: return video_stream['codec_name'] return "inconnu" except ffmpeg.Error as e: print(f"\033[91mErreur lors de l'analyse du codec vidéo pour {file_path}: {e}\033[0m") return "inconnu" def get_video_resolution(file_path): """Retourne la résolution de la vidéo au format 720p, 1080p, etc.""" try: probe = ffmpeg.probe(file_path) height = next((stream['height'] for stream in probe['streams'] if stream['codec_type'] == 'video'), None) return f"{height}p" if height else "" except Exception as e: print(f"\033[91mErreur lors de l'analyse de la résolution vidéo pour {file_path}: {e}\033[0m") return "" def get_video_bitrate(file_path): """Récupère le bitrate vidéo d'un fichier via ffmpeg.""" try: probe = ffmpeg.probe(file_path) for stream in probe['streams']: if stream['codec_type'] == 'video': # 1. Priorité : le champ 'bit_rate' s'il est présent if 'bit_rate' in stream: return int(stream['bit_rate']) // 1000 # 2. Sinon : on tente d'utiliser 'stream_size' s'il existe elif 'stream_size' in stream: return int((int(stream.get('stream_size')) * 8) / float(probe['format']['duration']) / 1000) # 3. Fallback : calcul à partir de la taille du fichier entier (moins précis) else: return int((os.path.getsize(file_path) * 8) / float(probe['format']['duration']) / 1000) except Exception as e: print(f"\033[91mErreur lors de l'analyse du bitrate vidéo pour {file_path}: {e}\033[0m") return 0 def execute_command(command, dry_run=False): """Exécute ou simule une commande en ligne.""" if dry_run: print(f"Commande simulée : {' '.join(command)}") result = "Simulé" else: print(f"Commande exécuté : {' '.join(command)}") subprocess.run(command, check=True) result = "Réussi" return result def copyfile(file, destination, dry_run=False): """Copie ou simule une copie de fichier.""" if dry_run: result = "Simulé" else: shutil.copyfile(file, destination) result = "Réussi" return result def find_video_files(directory): """Trouve tous les fichiers vidéo dans un dossier.""" video_files = [] for root, dirs, files in os.walk(directory): for file in files: file_path = os.path.join(root, file) mime_type, _ = mimetypes.guess_type(file_path) if mime_type and mime_type.startswith('video'): video_files.append(os.path.relpath(file_path, directory)) return video_files def get_preset_name_from_file(preset_file): """Extrait le nom du preset à partir du fichier JSON.""" try: with open(preset_file, 'r', encoding='utf-8') as f: data = json.load(f) return data["PresetList"][0]["PresetName"] except Exception as e: print(f"\033[91mErreur : Impossible de lire le preset name depuis {preset_file} : {e}\033[0m") sys.exit(1) def afficher_tableau(lignes, use_pager=False): buffer = [] # En-tête avec colonnes ajustées header = ( f"{'Fichier':<100} | " f"{'Codec':<10} | " f"{'Résolution':<10} | " f"{'Action':<10} | " f"{'Durée':<10} | " f"{'Taille SRC':<10} | " f"{'Bitrate SRC':<12} | " f"{'Taille DES':<10} | " f"{'Bitrate DES':<12} | " f"{'Gain %':<6} | " f"{'Résultat':<10} |" ) # Calculer la longueur de la ligne de séparation en fonction de la longueur de l'en-tête separator_length = len(header) # En-tête et ligne de séparation buffer.append("\nRésumé des opérations :") buffer.append(header) buffer.append("-" * separator_length) for ligne in lignes: file, codec, resolution, action, duration, input_size, output_size, input_bitrate, output_bitrate, result = ligne gain_percent = calc_gain_percent(input_size, output_size) action_color = "" result_color = "" reset_color = "\033[0m" # Couleurs d'action if action == "Réencodé": action_color = "\033[1;35m" # Magenta gras elif action == "Copié": action_color = "\033[1;33m" # Jaune gras # Couleurs de résultat if result == "Réussi": result_color = "\033[1;32m" # Vert gras elif result == "Erreur": result_color = "\033[1;31m" # Rouge gras elif result == "Interrompu": result_color = "\033[1;38;5;208m" # Orange gras elif result == "Existant": result_color = "\033[1;36m" # Cyan gras elif result == "Simulé": result_color = "\033[1;34m" # Bleu vif gras # Couleur de la taille destination selon gain ou perte if output_size < input_size: output_size_color = "\033[1;32m" # Vert gras (gain) elif output_size == input_size: output_size_color = "\033[1;38;5;208m" # Orange gras (égal) else: output_size_color = "\033[1;31m" # Rouge gras (perte) # Définir une couleur selon le gain # > 50% => vert, > 30% => orange, sinon rouge if gain_percent > 50: gain_color = "\033[1;32m" # Vert gras elif gain_percent > 30: gain_color = "\033[1;38;5;208m" # Orange gras else: gain_color = "\033[1;31m" # Rouge gras buffer.append( f"{action_color}{file[:100]:<100}{reset_color} | " # Fichier f"{action_color}{codec[:10]:<10}{reset_color} | " # Codec f"{action_color}{resolution[:10]:>10}{reset_color} | " # Résolution f"{action_color}{action[:10]:<10}{reset_color} | " # Action f"{action_color}{format_duration(duration)[:10]:>10}{reset_color} | " # Durée f"{human_readable_size(input_size)[:10]:>10} | " # Taille SRC f"{(f'{input_bitrate} kb/s' if isinstance(input_bitrate, (float, int)) else '')[:12]:>12} | " # Bitrate SRC f"{output_size_color}{human_readable_size(output_size)[:10]:>10}{reset_color} | " # Taille DES f"{(f'{output_bitrate} kb/s' if isinstance(output_bitrate, (float, int)) else '')[:12]:>12} | " # Bitrate DES f"{gain_color}{gain_percent:>5.1f}%{reset_color} | " # Gain % f"{result_color}{result[:10]:<10}{reset_color} |" # Résultat ) tableau = "\n".join(buffer) if use_pager: subprocess.run(["less", "-SR"], input=tableau.encode('utf-8')) else: print(tableau) def main(): # Configuration des arguments en ligne de commande parser = argparse.ArgumentParser(description='Traitement des fichiers vidéo avec HandBrakeCLI.') parser.add_argument('-p', '--preset', required=True, help='Fichier preset JSON à utiliser avec HandBrakeCLI') parser.add_argument('-i', '--input_directory', required=True, help='Dossier d’entrée contenant les fichiers vidéo') parser.add_argument('-o', '--output_directory', required=True, help='Dossier de sortie pour les fichiers traités') parser.add_argument('--dry-run', action='store_true', help='Simule les actions sans modifier les fichiers') parser.add_argument('--pager', action='store_true', help='Utilise less pour paginer le résumé final') parser.add_argument("--service", action="store_true", help="Lancer via systemd-run") args = parser.parse_args() if args.service: if "INVOCATION_ID" not in os.environ: relancer_avec_systemd() # Récupération des arguments preset_file = args.preset input_directory = args.input_directory output_directory = args.output_directory dry_run = args.dry_run use_pager = args.pager # Vérification de l'existence des fichiers et dossiers check_file_exists(preset_file) check_directory_existence(input_directory) if not dry_run: check_directory_existence(output_directory, create_if_missing=True) check_directory_writable(output_directory) # Récupération du nom du preset preset_name = get_preset_name_from_file(preset_file) # Recherche des fichiers vidéo dans le dossier d'entrée video_files = find_video_files(input_directory) # Liste pour enregistrer le résumé lignes = [] inhibiteurs = inhibit_arret( "HandBrake Script", "Encodage vidéo en cours") try: # Traitement de chaque fichier vidéo for video_file in video_files: input_file_path = os.path.join(input_directory, video_file) output_file_path = os.path.join(output_directory, video_file) if not dry_run: check_directory_existence(os.path.dirname(output_file_path), create_if_missing=True) # Récupération du codec du fichier vidéo codec = get_video_codec(input_file_path) # Choix de l'action if codec == "hevc": action = "Copié" output_file_path = os.path.join(output_directory, video_file) else: action = "Réencodé" output_file_path = os.path.splitext(os.path.join(output_directory, video_file))[0] + '.mp4' try: # Si le fichier existe déjà if os.path.exists(output_file_path): result = "Existant" print(f"\033[96mFichier existant : {input_file_path} vers {output_file_path} (action aurait été : {action})\033[0m") elif action == "Réencodé": print(f"\033[95mRéencodage de {input_file_path} vers {output_file_path} (Non-HEVC détecté)\033[0m") command = ['HandBrakeCLI', '--preset-import-file', preset_file, '-Z', preset_name, '-i', input_file_path, '-o', output_file_path] result = execute_command(command, dry_run) elif action == "Copié": print(f"\033[93mCopie de {input_file_path} vers {output_file_path} (HEVC détecté)\033[0m") result = copyfile(input_file_path, output_file_path, dry_run) except KeyboardInterrupt: print("\033[91mInterruption clavier détectée.\033[0m") if os.path.exists(output_file_path): os.remove(output_file_path) result = "Interrompu" raise except Exception as e: print(f"\033[91mErreur lors du traitement de {input_file_path} : {e}\033[0m") if os.path.exists(output_file_path): os.remove(output_file_path) result = "Erreur" finally: # Enregistrer le résultat même en cas d'erreur ou interruption # Récupère des informations du fichier de départ resolution = get_video_resolution(input_file_path) input_size = os.path.getsize(input_file_path) input_bitrate = get_video_bitrate(input_file_path) # Récupère des informations du fichier final output_size = os.path.getsize(output_file_path) if os.path.exists(output_file_path) else 0 output_bitrate = get_video_bitrate(output_file_path) if os.path.exists(output_file_path) else "" duration = get_video_duration(input_file_path) lignes.append((video_file, codec, resolution, action, duration, input_size, output_size, input_bitrate, output_bitrate, result)) except KeyboardInterrupt: pass finally: # Calcul des tailles des dossiers source et destination (après traitement) input_total = sum(r[5] for r in lignes) output_total = sum(r[6] for r in lignes) # Insertion d'une ligne récapitulative des tailles des dossiers lignes.insert(0, ( "[Taille Totale]", # Fichier "", # Codec "", # Résolution "", # Action "", # Durée input_total, # Taille SRC output_total, # Taille DES "", # Bitrate SRC "", # Bitrate DES "" # Résultat )) xlsx_file = "résumé.xlsx" export_xlsx(lignes, fichier_sortie=xlsx_file) print(f"📝 Classeur généré : {xlsx_file}") lever_inhibit_arret(inhibiteurs) # Affichage du résumé afficher_tableau(lignes, use_pager=use_pager) if __name__ == "__main__": main()