#!/usr/bin/env python3 import subprocess import json import re import openpyxl import time import tempfile import os import shutil # Configuration hote_proxmox = "proxmox.lan" utilisateur_ssh = "impt" classeur = "proxmox_export_disk" def executer_commande_ssh(commande, entree=None): """Exécute une commande sur le serveur Proxmox via SSH en utilisant sudo.""" commande_ssh = [ "ssh", f"{utilisateur_ssh}@{hote_proxmox}", f"sudo {commande}" ] resultat = subprocess.run(commande_ssh, input=entree, capture_output=True, text=True) # Affiche stderr s’il y a quelque chose dedans if resultat.stderr: print("---- SSH STDERR ----") print(resultat.stderr) if resultat.returncode != 0: return None return resultat.stdout def recuperer_noeuds(): """Récupère la liste des nœuds du cluster Proxmox.""" donnees_noeuds = executer_commande_ssh("pvesh get /nodes --output-format json") return json.loads(donnees_noeuds) if donnees_noeuds else [] def recuperer_vms(noeuds): """Récupère toutes les VM et conteneurs sur un nœud donné.""" vms = [] for noeud in noeuds: nom_noeud = noeud.get("node") for type_vm in ["qemu", "lxc"]: cmd = f"pvesh get /nodes/{nom_noeud}/{type_vm} --output-format json" donnees_vms = executer_commande_ssh(cmd) liste_vm = json.loads(donnees_vms) for vm in liste_vm: vm['vmid'] = int(vm.get('vmid')) vms.extend(liste_vm) return vms def recuperer_configs_vms(vms): """Récupère les configurations de toutes les VMs/CTs en une seule commande SSH (retour JSON).""" # Contenu du script Python à exécuter côté serveur script = """ import json, sys res={} for i in json.load(sys.stdin): v = i.get('vmid') t = i.get('type') c = f'/etc/pve/{t}/{v}.conf' res[v]=open(c).read() print(json.dumps(res)) """ # Préparer les données avec les VMID et les types donnees = [{'vmid': vm.get('vmid'), 'type': vm.get('type', 'qemu-server')} for vm in vms] # On prépare la commande SSH qui exécute le script Python commande = f'python3 -c "{script}"' # On exécute en passant 'data' comme entrée standard du script sortie = executer_commande_ssh(commande, entree=json.dumps(donnees, separators=(",", ":"))) # Parser le JSON retourné try: configs_vms = json.loads(sortie) return configs_vms except json.JSONDecodeError as e: print("Erreur lors du décodage du JSON de configurations des VMs :", e) return {} def recuperer_infos_stockage(): """Récupère les informations de configuration du stockage.""" cmd = "cat /etc/pve/storage.cfg" sortie = executer_commande_ssh(cmd) if sortie: infos_stockage = {} stockage_actuel = None for ligne in sortie.splitlines(): if not ligne: stockage_actuel = None elif ligne.startswith('\t'): parties = ligne[1:].split(' ', 1) cle = parties[0].strip() valeur = parties[1].strip() if len(parties) > 1 else True if stockage_actuel: infos_stockage[stockage_actuel][cle] = valeur else: type_stockage, stockage_actuel = ligne.split(': ', 1) infos_stockage[stockage_actuel] = {'type': type_stockage} return infos_stockage def recuperer_info_disques(vms, infos_stockage, configs): """Récupère tous les disques à partir des configurations et des tailles déjà récupérées.""" info_disques = [] for vm in vms: vmid = vm.get("vmid") type_vm = "lxc" if vm.get("type") == "lxc" else "qemu" config = configs.get(str(vmid)) if not config: continue for ligne in config.splitlines(): if "media=cdrom" in ligne or "iso" in ligne: continue elif not ligne: # Arrêter à la première ligne vide break elif re.match(r"^(scsi\d|sata\d|ide\d|mp\d|rootfs)", ligne): try: infos_ligne = ligne.split(":", 1)[1].split(",") nom_stockage, nom_disque = [x.strip() for x in infos_ligne[0].split(":")] point_montage = None if type_vm == "lxc": if re.match(r"^(mp\d)", ligne): point_montage = ligne.split("mp=")[1].split(",")[0] elif ligne.startswith("rootfs"): point_montage = "/" if nom_stockage in infos_stockage: type_stockage = infos_stockage[nom_stockage]['type'] if type_stockage == "lvmthin": # LVM chemin = f"/dev/{infos_stockage[nom_stockage]['vgname']}/{nom_disque}" elif type_stockage == "dir": # Fichier raw ou qcow2 chemin = f"{infos_stockage[nom_stockage]['path']}/images/{nom_disque}" nom_disque = nom_disque.replace(str(vmid) + "/", "") else: continue info_disques.append({'vm': vm, 'chemin': chemin, 'nom_disque': nom_disque, 'nom_stockage': nom_stockage, 'type_stockage': type_stockage, 'point_montage': point_montage}) except (ValueError, AttributeError): print(f"Erreur de parsing de la ligne de configuration : {ligne}") return info_disques def recuperer_taille_disques(info_disques): tailles_disques = {} chemin_disques_lvms = [] chemin_disques_fichiers = [] for info_disque in info_disques: # Récupère la taille d'un disque. if info_disque.get('type_stockage') == "lvmthin": # LVM chemin_disques_lvms.append(info_disque.get('chemin')) elif info_disque.get('type_stockage') == "dir": # Fichier raw ou qcow2 chemin_disques_fichiers.append(info_disque.get('chemin')) tailles_disques.update(recuperer_taille_lvm(chemin_disques_lvms) or {}) tailles_disques.update(recuperer_taille_fichier(chemin_disques_fichiers) or {}) return tailles_disques def recuperer_taille_lvm(chemin_disques): """Analyse les tailles d'un volume logique LVM.""" tailles_disques = {} cmd = f'lvs -o lv_path,lv_size,data_percent --noheadings --units g --separator "|" --nosuffix {" ".join(chemin_disques)}' sortie = executer_commande_ssh(cmd) lignes = sortie.splitlines() # Analyse LVM for ligne in lignes: chemin, taille, pourcentage = ligne.strip().replace(',', '.').split('|') tailles_disques[chemin] = { "taille_totale": float(taille), "pourcentage_utilise": float(pourcentage), "taille_utilisee": float(taille) * float(pourcentage) / 100 } return tailles_disques def recuperer_taille_fichier(chemin_disques): """Analyse les tailles d'une image disque (qcow2/raw).""" tailles_disques = {} for chemin_disque in chemin_disques: cmd = f"qemu-img info {chemin_disque}" sortie = executer_commande_ssh(cmd) if sortie: lignes = sortie.splitlines() taille_virtuelle = None taille_disque = None for ligne in lignes: if 'virtual size:' in ligne: taille_virtuelle = ligne.split(':')[1].strip().split('(')[0].strip() elif 'disk size:' in ligne: taille_disque = ligne.split(':')[1].strip() if taille_virtuelle and taille_disque: valeur_virt, unite_virt = taille_virtuelle.split(' ') valeur_phys, unite_phys = taille_disque.split(' ') taille_virt = float(valeur_virt) * {'GiB': 1, 'MiB': 1/1024, 'KiB': 1/1024/1024}[unite_virt] taille_phys = float(valeur_phys) * {'GiB': 1, 'MiB': 1/1024, 'KiB': 1/1024/1024}[unite_phys] pourcentage_utilise = (taille_phys / taille_virt) * 100 tailles_disques[chemin_disque] = { "taille_totale": taille_virt, "pourcentage_utilise": pourcentage_utilise, "taille_utilisee": taille_phys } return tailles_disques def croisement_info(info_disques, taille_disques): disques = [] for info_disque in info_disques: vm = info_disque.get('vm') taille_disque = taille_disques.get(info_disque.get('chemin')) if taille_disque is None: continue disques.append({ "VMID": vm.get('vmid'), "Nom": vm.get("name"), "Type": vm.get("type", "qemu"), "Disque": info_disque.get('nom_disque'), "Stockage": info_disque.get('nom_stockage'), "Taille Totale (GB)": taille_disque.get('taille_totale'), "% Utilisé": taille_disque.get('pourcentage_utilise') / 100, "Taille Utilisée (GB)": taille_disque.get('taille_utilisee'), "Point de Montage": info_disque.get('point_montage'), }) return disques def exporter_classeur(disques, fichier_sortie): """Exporte les données des disques dans un fichier, en choisissant automatiquement entre XLSX et ODS.""" # Vérifie si LibreOffice est disponible sur le système libreoffice_disponible = subprocess.run( ["which", "soffice"], capture_output=True, text=True ).returncode == 0 # Si LibreOffice est disponible, exporte en ODS, sinon en XLSX if libreoffice_disponible: fichier_sortie_avec_extension = ajouter_extension(fichier_sortie, ".ods") print("LibreOffice est disponible, exportation en ODS...") exporter_vers_ods(disques, fichier_sortie_avec_extension) else: fichier_sortie_avec_extension = ajouter_extension(fichier_sortie, ".xlsx") print("LibreOffice n'est pas disponible, exportation en XLSX...") exporter_vers_xlsx(disques, fichier_sortie_avec_extension) return fichier_sortie_avec_extension def exporter_vers_xlsx(disques, fichier_sortie): """Exporte les données des disques dans un fichier XLSX avec un filtre automatique.""" champs = ["VMID", "Nom", "Type", "Disque", "Stockage", "Taille Totale (GB)", "% Utilisé", "Taille Utilisée (GB)", "Point de Montage"] wb = openpyxl.Workbook() ws = wb.active ws.title = "Disques" ws.append(champs) ws.freeze_panes = "B2" # Tri croissant des disques par VMID disques = sorted(disques, key=lambda x: x.get("VMID")) # Si VMID est absent, on lui attribue la valeur 0 par défaut for disque in disques: ligne = [disque.get(champ, "") for champ in champs] ws.append(ligne) derniere_ligne = ws.max_row ws[f'A{derniere_ligne + 2}'] = f'=SUBTOTAL(103, A2:A{derniere_ligne})' ws[f'H{derniere_ligne + 2}'] = f'=SUBTOTAL(109, H2:H{derniere_ligne})' for ligne in ws.iter_rows(min_row=2, max_row=ws.max_row, max_col=len(champs)): ligne[5].number_format = '#,##0.00 "GB"' ligne[6].number_format = '0.00%' ligne[7].number_format = '#,##0.00 "GB"' ws.auto_filter.ref = f"A1:{ws.cell(row=derniere_ligne, column=len(champs)).coordinate}" for colonne in ws.columns: cellules = [cell for cell in colonne if cell.row <= derniere_ligne] longueur = max(len(str(cell.value)) if cell.value is not None else 0 for cell in cellules) largeur = max(longueur + 2, 8) ws.column_dimensions[colonne[0].column_letter].width = largeur wb.save(fichier_sortie) def exporter_vers_ods(disques, fichier_sortie): """Exporte directement en ODS sans conserver le XLSX sur disque.""" # Crée un dossier temporaire pour le XLSX with tempfile.TemporaryDirectory() as dossier_temp: xlsx_temp = os.path.join(dossier_temp, "temp.xlsx") ods_temp = os.path.join(dossier_temp, "temp.ods") # Crée le XLSX exporter_vers_xlsx(disques, xlsx_temp) # Utilise LibreOffice pour convertir le fichier temporaire commande = [ "soffice", "--headless", "--convert-to", "ods", "--outdir", dossier_temp, xlsx_temp ] subprocess.run(commande, capture_output=True, check=True) # Renomme le fichier généré par LibreOffice au bon nom de sortie chemin_ods_converti = os.path.join(dossier_temp, ods_temp) shutil.copyfile(chemin_ods_converti, fichier_sortie) def ajouter_extension(chemin_fichier, extension): """Ajoute l’extension souhaitée si elle est absente ou incorrecte.""" racine, ext = os.path.splitext(chemin_fichier) ext = ext.lower() extension = extension.lower() # S’assure que l’extension commence par un point if not extension.startswith('.'): extension = '.' + extension # Si l’extension est déjà correcte → ne rien changer if ext == extension: return chemin_fichier # Sinon, remplace ou ajoute la bonne extension return racine + extension def main(): """ Récupère les informations sur les disques des VM et conteneurs de tous les nœuds Proxmox, puis exporte les données dans un fichier XLSX. """ print("Récupération des nœuds...") debut = time.time() noeuds = recuperer_noeuds() fin = time.time() print(f"Temps écoulé pour la récupération des nœuds : {fin - debut:.2f} secondes") if not noeuds: print("Aucun nœud trouvé.") return print("Récupération des VMs...") debut = time.time() vms = recuperer_vms(noeuds) fin = time.time() print(f"Temps écoulé pour la récupération des VMs : {fin - debut:.2f} secondes") if not vms: print("Aucune VM trouvée.") return print("Récupération des stockages...") debut = time.time() infos_stockage = recuperer_infos_stockage() fin = time.time() print(f"Temps écoulé pour la récupération des stockages : {fin - debut:.2f} secondes") if not infos_stockage: print("Aucun stockage trouvé.") return print("Récupération des configurations des VMs...") debut = time.time() configs = recuperer_configs_vms(vms) fin = time.time() print(f"Temps écoulé pour la récupération des configurations : {fin - debut:.2f} secondes") print("Récupération de emplacement des disques...") debut = time.time() chemin_disques = recuperer_info_disques(vms, infos_stockage, configs) fin = time.time() print(f"Temps écoulé pour la récupération de emplacement des disques : {fin - debut:.2f} secondes") if not chemin_disques: print("Aucune information à exporter.") return print("Récupération de la taille des disques...") debut = time.time() taille_disques = recuperer_taille_disques(chemin_disques) fin = time.time() print(f"Temps écoulé pour la récupération de la taille des disques : {fin - debut:.2f} secondes") if not taille_disques: print("Aucune information à exporter.") return print("Croisement des informations...") debut = time.time() disques = croisement_info(chemin_disques, taille_disques) fin = time.time() print(f"Temps écoulé pour la récupération des disques : {fin - debut:.2f} secondes") if not disques: print("Aucune information à exporter.") return print("Exportation des données en Classeur...") nom_du_classeur = exporter_classeur(disques, classeur) print(f"Données exportées dans {nom_du_classeur}") if __name__ == "__main__": main()