ProjetsPython/HandBrake_recursive/HandBrake_recursive.py

563 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import sys
import mimetypes
import subprocess
import shutil
import math
import json
import argparse
import ffmpeg
from datetime import datetime
import pydbus as dbus
import openpyxl
from openpyxl.styles import PatternFill
from openpyxl.formatting.rule import CellIsRule
date = datetime.now().strftime("%Y-%m-%d_%Hh%M")
def inhibit_arret(application, raison):
inhibiteurs = {}
# Inhibition arrêt et veille via login1 (SystemBus)
system_bus = dbus.SystemBus()
login1 = system_bus.get("org.freedesktop.login1", "/org/freedesktop/login1")
inhibiteurs["shutdown"] = login1.Inhibit(
"shutdown",
application,
raison,
"block"
)
inhibiteurs["sleep"] = login1.Inhibit(
"sleep",
application,
raison,
"block"
)
# Inhibition déconnexion via gnome-session (SessionBus)
session_bus = dbus.SessionBus()
session_manager = session_bus.get("org.gnome.SessionManager", "/org/gnome/SessionManager")
# flags:
# 0 = logout only
# 1 = user switch
# 2 = suspend
# 4 = shutdown
# 8 = idle (screensaver, etc)
flags = 0 | 1 | 2 | 4 | 8
inhibiteurs["session"] = session_manager.Inhibit(
application,
0,
raison,
flags
)
return inhibiteurs
def lever_inhibit_arret(inhibiteurs):
# Fermer le descripteur de fichier pour lever l'inhibition
for cle, inhibiteur in inhibiteurs.items():
if cle in ["shutdown", "sleep"] and hasattr(inhibiteur, "close"):
inhibiteur.close()
elif cle == "session":
# Connexion au bus session
session_bus = dbus.SessionBus()
# Obtenir l'interface SessionManager de gnome
session_manager = session_bus.get("org.gnome.SessionManager", "/org/gnome/SessionManager")
session_manager.Uninhibit(inhibiteur)
def relancer_avec_systemd():
# Constitue la commande avec systemd-run
cmd = [
"systemd-run",
"--unit=handbrake_recursive",
"--user",
"--description=Encodage vidéo",
f'--working-directory={os.getcwd()}',
"--",
sys.argv[0]
]
# Ajoute les arguments sauf --service et --pager (journalctl utilise déjà un pager)
cmd += [arg for arg in sys.argv[1:] if arg not in ("--service", "--pager")]
print(f"Lancement du script via systemd:\n{' '.join(cmd)}")
print(f"Pour suivre l'exécution du programme:\n journalctl --user -f -u handbrake_recursive")
# Remplace le processus courant
os.execvp(cmd[0], cmd)
# Définir la largeur de chaque colonne à environ 3 cm (approximativement 10.5 unités)
def set_column_width(ws, column_range, width=15):
for col in column_range:
ws.column_dimensions[col].width = width
def export_xlsx(lignes, fichier_sortie="résumé.xlsx"):
# Charger le classeur existant ou en créer un nouveau
if os.path.exists(fichier_sortie):
wb = openpyxl.load_workbook(fichier_sortie)
else:
wb = openpyxl.Workbook()
# Supprimer la feuille par défaut si on ajoute une nouvelle
default_sheet = wb.active
wb.remove(default_sheet)
# Créer une nouvelle feuille
ws = wb.create_sheet(title=date)
# En-têtes
headers = ["Fichier", "Codec", "Résolution", "Action", "Durée", "Taille SRC", "Bitrate SRC",
"Taille DES", "Bitrate DES", "Gain %", "Résultat"]
ws.append(headers)
ws.auto_filter.ref = "A1:K1"
ws.freeze_panes = "B2"
# Appliquer à toutes les colonnes
set_column_width(ws, ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K'])
# Styles pour les cellules
styles = {
"Réussi": PatternFill(start_color="d4f5dd", end_color="d4f5dd", fill_type="solid"),
"Erreur": PatternFill(start_color="f8d7da", end_color="f8d7da", fill_type="solid"),
"Interrompu": PatternFill(start_color="fdd692", end_color="fdd692", fill_type="solid"),
"Existant": PatternFill(start_color="dbeff7", end_color="dbeff7", fill_type="solid"),
"Simulé": PatternFill(start_color="d1e7ff", end_color="d1e7ff", fill_type="solid"),
"Réencodé": PatternFill(start_color="e9d5e5", end_color="e9d5e5", fill_type="solid"),
"Copié": PatternFill(start_color="fff3cd", end_color="fff3cd", fill_type="solid"),
"Gain > 50%": PatternFill(start_color="d4f5dd", end_color="d4f5dd", fill_type="solid"),
"Gain > 30%": PatternFill(start_color="fff3cd", end_color="fff3cd", fill_type="solid"),
"Gain < 30%": PatternFill(start_color="f8d7da", end_color="f8d7da", fill_type="solid"),
"Taille <": PatternFill(start_color="d4f5dd", end_color="d4f5dd", fill_type="solid"),
"Taille =": PatternFill(start_color="fff3cd", end_color="fff3cd", fill_type="solid"),
"Taille >": PatternFill(start_color="f8d7da", end_color="f8d7da", fill_type="solid"),
}
# Données
for ligne in lignes:
file, codec, resolution, action, duration, input_size, output_size, input_bitrate, output_bitrate, result = ligne
gain_percent = calc_gain_percent(input_size, output_size, raw=True)
row = [
file, # string
codec, # string
resolution, # string
action, # string
duration / 86400 if isinstance(duration, (float, int)) else duration, # float fraction de jour
input_size / 1024 ** 2, # int (octets)
input_bitrate, # int
output_size / 1024 ** 2, # int
output_bitrate, # int
gain_percent, # float (vrai pourcentage)
result # string
]
ws.append(row)
nb_lignes = ws.max_row
for row in ws.iter_rows(min_row=2, max_col=len(headers)): # skip header
row[4].number_format = '[h]"h"mm"m"ss"s"' # Excel-style format
row[5].number_format = '#,##0 "Mo"' # Taille SRC
row[6].number_format = '#,##0 "kb/s"' # Bitrate SRC
row[7].number_format = '#,##0 "Mo"' # Taille DES
row[8].number_format = '#,##0 "kb/s"' # Bitrate DES
row[9].number_format = '0.0%' # Gain %
# Appliquer couleurs sur colonne Action (colonne D = index 3)
for row in ws.iter_rows(min_row=2, min_col=4, max_col=4, max_row=ws.max_row):
cell = row[0]
action = cell.value
if action in styles:
for col_index in [0, 1, 2, 3, 4]: # Colonnes Fichier, Codec, Résolution, Action, Durée
ws.cell(row=cell.row, column=col_index + 1).fill = styles[action]
# Appliquer couleurs sur colonne Résultat (colonne K = index 10)
for row in ws.iter_rows(min_row=2, min_col=11, max_col=11, max_row=ws.max_row):
cell = row[0]
resultat = cell.value
if resultat in styles:
cell.fill = styles[resultat]
# Ajouter la mise en forme conditionnelle
# Taille DES
ws.conditional_formatting.add(f'H2:H{nb_lignes}', CellIsRule(operator='lessThan', formula=[f'F2:F{nb_lignes}'], stopIfTrue=True, fill=styles["Taille <"]))
ws.conditional_formatting.add(f'H2:H{nb_lignes}', CellIsRule(operator='equal', formula=[f'F2:F{nb_lignes}'], stopIfTrue=True, fill=styles["Taille ="]))
ws.conditional_formatting.add(f'H2:H{nb_lignes}', CellIsRule(operator='greaterThan', formula=[f'F2:F{nb_lignes}'], stopIfTrue=True, fill=styles["Taille >"]))
# Gain %
ws.conditional_formatting.add(f'J2:J{nb_lignes}', CellIsRule(operator='greaterThan', formula=['50%'], stopIfTrue=True, fill=styles["Gain > 50%"]))
ws.conditional_formatting.add(f'J2:J{nb_lignes}', CellIsRule(operator='greaterThan', formula=['30%'], stopIfTrue=True, fill=styles["Gain > 30%"]))
ws.conditional_formatting.add(f'J2:J{nb_lignes}', CellIsRule(operator='lessThan', formula=['30%'], stopIfTrue=True, fill=styles["Gain < 30%"]))
# Sauvegarder le fichier
wb.save(fichier_sortie)
def human_readable_size(size_bytes):
"""Convertit une taille en octets en une taille lisible (Ko, Mo, Go)."""
if size_bytes == 0:
return "0 B"
size_name = ("B", "KB", "MB", "GB", "TB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return f"{s} {size_name[i]}"
def format_duration(seconds):
"""Formate la durée en h/m/s lisible"""
if not isinstance(seconds, (float, int)) or seconds <= 0:
return ""
else:
return f"{int(seconds // 3600)}h" * (seconds >= 3600) + f"{int((seconds % 3600) // 60)}m" * (seconds >= 60) + f"{int(seconds % 60)}s"
def calc_ratio(size, duration):
"""Calcule le bitrate approximatif (octets/sec)."""
return str(int(size / duration)) if duration else ""
def calc_gain_percent(input_size, output_size, raw=False):
if input_size > 0 and output_size > 0:
value = (input_size - output_size) / input_size
return value if raw else value * 100
return 0
def check_file_exists(file_path):
"""Vérifie si un fichier existe."""
if not os.path.exists(file_path):
print(f"\033[91mErreur : Le fichier {file_path} n'existe pas.\033[0m")
sys.exit(1)
def check_directory_existence(directory_path, create_if_missing=False):
"""Vérifie si un dossier existe et le crée si nécessaire."""
if not os.path.exists(directory_path):
if create_if_missing:
try:
os.makedirs(directory_path)
print(f"Dossier créé : {directory_path}")
except OSError as e:
print(f"\033[91mErreur : Impossible de créer le dossier {directory_path}. {e}\033[0m")
sys.exit(1)
else:
print(f"\033[91mErreur : Le dossier {directory_path} n'existe pas.\033[0m")
sys.exit(1)
def check_directory_writable(directory_path):
"""Vérifie si un dossier est accessible en écriture."""
if not os.access(directory_path, os.W_OK):
print(f"\033[91mErreur : Le dossier {directory_path} n'est pas accessible en écriture.\033[0m")
sys.exit(1)
def get_video_duration(file_path):
"""Retourne la durée d'une vidéo en secondes."""
try:
probe = ffmpeg.probe(file_path)
return float(probe['format']['duration'])
except Exception as e:
print(f"\033[91mErreur lors de la récupération de la durée pour {file_path}: {e}\033[0m")
return 0.0
def get_video_codec(file_path):
"""Retourne le codec vidéo d'un fichier avec ffmpeg."""
try:
probe = ffmpeg.probe(file_path)
video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
if video_stream:
return video_stream['codec_name']
return "inconnu"
except ffmpeg.Error as e:
print(f"\033[91mErreur lors de l'analyse du codec vidéo pour {file_path}: {e}\033[0m")
return "inconnu"
def get_video_resolution(file_path):
"""Retourne la résolution de la vidéo au format 720p, 1080p, etc."""
try:
probe = ffmpeg.probe(file_path)
height = next((stream['height'] for stream in probe['streams'] if stream['codec_type'] == 'video'), None)
return f"{height}p" if height else ""
except Exception as e:
print(f"\033[91mErreur lors de l'analyse de la résolution vidéo pour {file_path}: {e}\033[0m")
return ""
def get_video_bitrate(file_path):
"""Récupère le bitrate vidéo d'un fichier via ffmpeg."""
try:
probe = ffmpeg.probe(file_path)
for stream in probe['streams']:
if stream['codec_type'] == 'video':
# 1. Priorité : le champ 'bit_rate' s'il est présent
if 'bit_rate' in stream:
return int(stream['bit_rate']) // 1000
# 2. Sinon : on tente d'utiliser 'stream_size' s'il existe
elif 'stream_size' in stream:
return int((int(stream.get('stream_size')) * 8) / float(probe['format']['duration']) / 1000)
# 3. Fallback : calcul à partir de la taille du fichier entier (moins précis)
else:
return int((os.path.getsize(file_path) * 8) / float(probe['format']['duration']) / 1000)
except Exception as e:
print(f"\033[91mErreur lors de l'analyse du bitrate vidéo pour {file_path}: {e}\033[0m")
return 0
def execute_command(command, dry_run=False):
"""Exécute ou simule une commande en ligne."""
if dry_run:
print(f"Commande simulée : {' '.join(command)}")
result = "Simulé"
else:
print(f"Commande exécuté : {' '.join(command)}")
subprocess.run(command, check=True)
result = "Réussi"
return result
def copyfile(file, destination, dry_run=False):
"""Copie ou simule une copie de fichier."""
if dry_run:
result = "Simulé"
else:
shutil.copyfile(file, destination)
result = "Réussi"
return result
def find_video_files(directory):
"""Trouve tous les fichiers vidéo dans un dossier."""
video_files = []
for root, dirs, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
mime_type, _ = mimetypes.guess_type(file_path)
if mime_type and mime_type.startswith('video'):
video_files.append(os.path.relpath(file_path, directory))
return video_files
def get_preset_name_from_file(preset_file):
"""Extrait le nom du preset à partir du fichier JSON."""
try:
with open(preset_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return data["PresetList"][0]["PresetName"]
except Exception as e:
print(f"\033[91mErreur : Impossible de lire le preset name depuis {preset_file} : {e}\033[0m")
sys.exit(1)
def afficher_tableau(lignes, use_pager=False):
buffer = []
# En-tête avec colonnes ajustées
header = (
f"{'Fichier':<100} | "
f"{'Codec':<10} | "
f"{'Résolution':<10} | "
f"{'Action':<10} | "
f"{'Durée':<10} | "
f"{'Taille SRC':<10} | "
f"{'Bitrate SRC':<12} | "
f"{'Taille DES':<10} | "
f"{'Bitrate DES':<12} | "
f"{'Gain %':<6} | "
f"{'Résultat':<10} |"
)
# Calculer la longueur de la ligne de séparation en fonction de la longueur de l'en-tête
separator_length = len(header)
# En-tête et ligne de séparation
buffer.append("\nRésumé des opérations :")
buffer.append(header)
buffer.append("-" * separator_length)
for ligne in lignes:
file, codec, resolution, action, duration, input_size, output_size, input_bitrate, output_bitrate, result = ligne
gain_percent = calc_gain_percent(input_size, output_size)
action_color = ""
result_color = ""
reset_color = "\033[0m"
# Couleurs d'action
if action == "Réencodé":
action_color = "\033[1;35m" # Magenta gras
elif action == "Copié":
action_color = "\033[1;33m" # Jaune gras
# Couleurs de résultat
if result == "Réussi":
result_color = "\033[1;32m" # Vert gras
elif result == "Erreur":
result_color = "\033[1;31m" # Rouge gras
elif result == "Interrompu":
result_color = "\033[1;38;5;208m" # Orange gras
elif result == "Existant":
result_color = "\033[1;36m" # Cyan gras
elif result == "Simulé":
result_color = "\033[1;34m" # Bleu vif gras
# Couleur de la taille destination selon gain ou perte
if output_size < input_size:
output_size_color = "\033[1;32m" # Vert gras (gain)
elif output_size == input_size:
output_size_color = "\033[1;38;5;208m" # Orange gras (égal)
else:
output_size_color = "\033[1;31m" # Rouge gras (perte)
# Définir une couleur selon le gain
# > 50% => vert, > 30% => orange, sinon rouge
if gain_percent > 50:
gain_color = "\033[1;32m" # Vert gras
elif gain_percent > 30:
gain_color = "\033[1;38;5;208m" # Orange gras
else:
gain_color = "\033[1;31m" # Rouge gras
buffer.append(
f"{action_color}{file[:100]:<100}{reset_color} | " # Fichier
f"{action_color}{codec[:10]:<10}{reset_color} | " # Codec
f"{action_color}{resolution[:10]:>10}{reset_color} | " # Résolution
f"{action_color}{action[:10]:<10}{reset_color} | " # Action
f"{action_color}{format_duration(duration)[:10]:>10}{reset_color} | " # Durée
f"{human_readable_size(input_size)[:10]:>10} | " # Taille SRC
f"{(f'{input_bitrate} kb/s' if isinstance(input_bitrate, (float, int)) else '')[:12]:>12} | " # Bitrate SRC
f"{output_size_color}{human_readable_size(output_size)[:10]:>10}{reset_color} | " # Taille DES
f"{(f'{output_bitrate} kb/s' if isinstance(output_bitrate, (float, int)) else '')[:12]:>12} | " # Bitrate DES
f"{gain_color}{gain_percent:>5.1f}%{reset_color} | " # Gain %
f"{result_color}{result[:10]:<10}{reset_color} |" # Résultat
)
tableau = "\n".join(buffer)
if use_pager:
subprocess.run(["less", "-SR"], input=tableau.encode('utf-8'))
else:
print(tableau)
def main():
# Configuration des arguments en ligne de commande
parser = argparse.ArgumentParser(description='Traitement des fichiers vidéo avec HandBrakeCLI.')
parser.add_argument('-p', '--preset', required=True, help='Fichier preset JSON à utiliser avec HandBrakeCLI')
parser.add_argument('-i', '--input_directory', required=True, help='Dossier dentrée contenant les fichiers vidéo')
parser.add_argument('-o', '--output_directory', required=True, help='Dossier de sortie pour les fichiers traités')
parser.add_argument('--dry-run', action='store_true', help='Simule les actions sans modifier les fichiers')
parser.add_argument('--pager', action='store_true', help='Utilise less pour paginer le résumé final')
parser.add_argument("--service", action="store_true", help="Lancer via systemd-run")
args = parser.parse_args()
if args.service:
if "INVOCATION_ID" not in os.environ:
relancer_avec_systemd()
# Récupération des arguments
preset_file = args.preset
input_directory = args.input_directory
output_directory = args.output_directory
dry_run = args.dry_run
use_pager = args.pager
# Vérification de l'existence des fichiers et dossiers
check_file_exists(preset_file)
check_directory_existence(input_directory)
if not dry_run:
check_directory_existence(output_directory, create_if_missing=True)
check_directory_writable(output_directory)
# Récupération du nom du preset
preset_name = get_preset_name_from_file(preset_file)
# Recherche des fichiers vidéo dans le dossier d'entrée
video_files = find_video_files(input_directory)
# Liste pour enregistrer le résumé
lignes = []
inhibiteurs = inhibit_arret( "HandBrake Script", "Encodage vidéo en cours")
try:
# Traitement de chaque fichier vidéo
for video_file in video_files:
input_file_path = os.path.join(input_directory, video_file)
output_file_path = os.path.join(output_directory, video_file)
if not dry_run: check_directory_existence(os.path.dirname(output_file_path), create_if_missing=True)
# Récupération du codec du fichier vidéo
codec = get_video_codec(input_file_path)
# Choix de l'action
if codec == "hevc":
action = "Copié"
output_file_path = os.path.join(output_directory, video_file)
else:
action = "Réencodé"
output_file_path = os.path.splitext(os.path.join(output_directory, video_file))[0] + '.mp4'
try:
# Si le fichier existe déjà
if os.path.exists(output_file_path):
result = "Existant"
print(f"\033[96mFichier existant : {input_file_path} vers {output_file_path} (action aurait été : {action})\033[0m")
elif action == "Réencodé":
print(f"\033[95mRéencodage de {input_file_path} vers {output_file_path} (Non-HEVC détecté)\033[0m")
command = ['HandBrakeCLI', '--preset-import-file', preset_file, '-Z', preset_name, '-i',
input_file_path, '-o', output_file_path]
result = execute_command(command, dry_run)
elif action == "Copié":
print(f"\033[93mCopie de {input_file_path} vers {output_file_path} (HEVC détecté)\033[0m")
result = copyfile(input_file_path, output_file_path, dry_run)
except KeyboardInterrupt:
print("\033[91mInterruption clavier détectée.\033[0m")
if os.path.exists(output_file_path):
os.remove(output_file_path)
result = "Interrompu"
raise
except Exception as e:
print(f"\033[91mErreur lors du traitement de {input_file_path} : {e}\033[0m")
if os.path.exists(output_file_path):
os.remove(output_file_path)
result = "Erreur"
finally:
# Enregistrer le résultat même en cas d'erreur ou interruption
# Récupère des informations du fichier de départ
resolution = get_video_resolution(input_file_path)
input_size = os.path.getsize(input_file_path)
input_bitrate = get_video_bitrate(input_file_path)
# Récupère des informations du fichier final
output_size = os.path.getsize(output_file_path) if os.path.exists(output_file_path) else 0
output_bitrate = get_video_bitrate(output_file_path) if os.path.exists(output_file_path) else ""
duration = get_video_duration(input_file_path)
lignes.append((video_file, codec, resolution, action, duration, input_size, output_size, input_bitrate, output_bitrate, result))
except KeyboardInterrupt:
pass
finally:
# Calcul des tailles des dossiers source et destination (après traitement)
input_total = sum(r[5] for r in lignes)
output_total = sum(r[6] for r in lignes)
# Insertion d'une ligne récapitulative des tailles des dossiers
lignes.insert(0, (
"[Taille Totale]", # Fichier
"", # Codec
"", # Résolution
"", # Action
"", # Durée
input_total, # Taille SRC
output_total, # Taille DES
"", # Bitrate SRC
"", # Bitrate DES
"" # Résultat
))
xlsx_file = "résumé.xlsx"
export_xlsx(lignes, fichier_sortie=xlsx_file)
print(f"📝 Classeur généré : {xlsx_file}")
lever_inhibit_arret(inhibiteurs)
# Affichage du résumé
afficher_tableau(lignes, use_pager=use_pager)
if __name__ == "__main__":
main()