ProjetsPython/proxmox_export_disk/proxmox_export_disk.py

399 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import subprocess
import json
import re
import openpyxl
import time
import tempfile
import os
import shutil
# Configuration
hote_proxmox = "proxmox.lan"
utilisateur_ssh = "impt"
classeur = "proxmox_export_disk"
def executer_commande_ssh(commande, entree=None):
"""Exécute une commande sur le serveur Proxmox via SSH en utilisant sudo."""
commande_ssh = [
"ssh",
f"{utilisateur_ssh}@{hote_proxmox}",
f"sudo {commande}"
]
resultat = subprocess.run(commande_ssh, input=entree, capture_output=True, text=True)
# Affiche stderr sil y a quelque chose dedans
if resultat.stderr:
print("---- SSH STDERR ----")
print(resultat.stderr)
if resultat.returncode != 0:
return None
return resultat.stdout
def recuperer_noeuds():
"""Récupère la liste des nœuds du cluster Proxmox."""
donnees_noeuds = executer_commande_ssh("pvesh get /nodes --output-format json")
return json.loads(donnees_noeuds) if donnees_noeuds else []
def recuperer_vms(noeuds):
"""Récupère toutes les VM et conteneurs sur un nœud donné."""
vms = []
for noeud in noeuds:
nom_noeud = noeud.get("node")
for type_vm in ["qemu", "lxc"]:
cmd = f"pvesh get /nodes/{nom_noeud}/{type_vm} --output-format json"
donnees_vms = executer_commande_ssh(cmd)
liste_vm = json.loads(donnees_vms)
for vm in liste_vm:
vm['vmid'] = int(vm.get('vmid'))
vms.extend(liste_vm)
return vms
def recuperer_configs_vms(vms):
"""Récupère les configurations de toutes les VMs/CTs en une seule commande SSH (retour JSON)."""
# Contenu du script Python à exécuter côté serveur
script = """
import json, sys
res={}
for i in json.load(sys.stdin):
v = i.get('vmid')
t = i.get('type')
c = f'/etc/pve/{t}/{v}.conf'
res[v]=open(c).read()
print(json.dumps(res))
"""
# Préparer les données avec les VMID et les types
donnees = [{'vmid': vm.get('vmid'), 'type': vm.get('type', 'qemu-server')} for vm in vms]
# On prépare la commande SSH qui exécute le script Python
commande = f'python3 -c "{script}"'
# On exécute en passant 'data' comme entrée standard du script
sortie = executer_commande_ssh(commande, entree=json.dumps(donnees, separators=(",", ":")))
# Parser le JSON retourné
try:
configs_vms = json.loads(sortie)
return configs_vms
except json.JSONDecodeError as e:
print("Erreur lors du décodage du JSON de configurations des VMs :", e)
return {}
def recuperer_infos_stockage():
"""Récupère les informations de configuration du stockage."""
cmd = "cat /etc/pve/storage.cfg"
sortie = executer_commande_ssh(cmd)
if sortie:
infos_stockage = {}
stockage_actuel = None
for ligne in sortie.splitlines():
if not ligne:
stockage_actuel = None
elif ligne.startswith('\t'):
parties = ligne[1:].split(' ', 1)
cle = parties[0].strip()
valeur = parties[1].strip() if len(parties) > 1 else True
if stockage_actuel:
infos_stockage[stockage_actuel][cle] = valeur
else:
type_stockage, stockage_actuel = ligne.split(': ', 1)
infos_stockage[stockage_actuel] = {'type': type_stockage}
return infos_stockage
def recuperer_info_disques(vms, infos_stockage, configs):
"""Récupère tous les disques à partir des configurations et des tailles déjà récupérées."""
info_disques = []
for vm in vms:
vmid = vm.get("vmid")
type_vm = "lxc" if vm.get("type") == "lxc" else "qemu"
config = configs.get(str(vmid))
if not config:
continue
for ligne in config.splitlines():
if "media=cdrom" in ligne or "iso" in ligne:
continue
elif not ligne: # Arrêter à la première ligne vide
break
elif re.match(r"^(scsi\d|sata\d|ide\d|mp\d|rootfs)", ligne):
try:
infos_ligne = ligne.split(":", 1)[1].split(",")
nom_stockage, nom_disque = [x.strip() for x in infos_ligne[0].split(":")]
point_montage = None
if type_vm == "lxc":
if re.match(r"^(mp\d)", ligne):
point_montage = ligne.split("mp=")[1].split(",")[0]
elif ligne.startswith("rootfs"):
point_montage = "/"
if nom_stockage in infos_stockage:
type_stockage = infos_stockage[nom_stockage]['type']
if type_stockage == "lvmthin":
# LVM
chemin = f"/dev/{infos_stockage[nom_stockage]['vgname']}/{nom_disque}"
elif type_stockage == "dir":
# Fichier raw ou qcow2
chemin = f"{infos_stockage[nom_stockage]['path']}/images/{nom_disque}"
nom_disque = nom_disque.replace(str(vmid) + "/", "")
else:
continue
info_disques.append({'vm': vm, 'chemin': chemin, 'nom_disque': nom_disque, 'nom_stockage': nom_stockage, 'type_stockage': type_stockage, 'point_montage': point_montage})
except (ValueError, AttributeError):
print(f"Erreur de parsing de la ligne de configuration : {ligne}")
return info_disques
def recuperer_taille_disques(info_disques):
tailles_disques = {}
chemin_disques_lvms = []
chemin_disques_fichiers = []
for info_disque in info_disques:
# Récupère la taille d'un disque.
if info_disque.get('type_stockage') == "lvmthin":
# LVM
chemin_disques_lvms.append(info_disque.get('chemin'))
elif info_disque.get('type_stockage') == "dir":
# Fichier raw ou qcow2
chemin_disques_fichiers.append(info_disque.get('chemin'))
tailles_disques.update(recuperer_taille_lvm(chemin_disques_lvms) or {})
tailles_disques.update(recuperer_taille_fichier(chemin_disques_fichiers) or {})
return tailles_disques
def recuperer_taille_lvm(chemin_disques):
"""Analyse les tailles d'un volume logique LVM."""
tailles_disques = {}
cmd = f'lvs -o lv_path,lv_size,data_percent --noheadings --units g --separator "|" --nosuffix {" ".join(chemin_disques)}'
sortie = executer_commande_ssh(cmd)
lignes = sortie.splitlines()
# Analyse LVM
for ligne in lignes:
chemin, taille, pourcentage = ligne.strip().replace(',', '.').split('|')
tailles_disques[chemin] = {
"taille_totale": float(taille),
"pourcentage_utilise": float(pourcentage),
"taille_utilisee": float(taille) * float(pourcentage) / 100
}
return tailles_disques
def recuperer_taille_fichier(chemin_disques):
"""Analyse les tailles d'une image disque (qcow2/raw)."""
tailles_disques = {}
for chemin_disque in chemin_disques:
cmd = f"qemu-img info {chemin_disque}"
sortie = executer_commande_ssh(cmd)
if sortie:
lignes = sortie.splitlines()
taille_virtuelle = None
taille_disque = None
for ligne in lignes:
if 'virtual size:' in ligne:
taille_virtuelle = ligne.split(':')[1].strip().split('(')[0].strip()
elif 'disk size:' in ligne:
taille_disque = ligne.split(':')[1].strip()
if taille_virtuelle and taille_disque:
valeur_virt, unite_virt = taille_virtuelle.split(' ')
valeur_phys, unite_phys = taille_disque.split(' ')
taille_virt = float(valeur_virt) * {'GiB': 1, 'MiB': 1/1024, 'KiB': 1/1024/1024}[unite_virt]
taille_phys = float(valeur_phys) * {'GiB': 1, 'MiB': 1/1024, 'KiB': 1/1024/1024}[unite_phys]
pourcentage_utilise = (taille_phys / taille_virt) * 100
tailles_disques[chemin_disque] = {
"taille_totale": taille_virt,
"pourcentage_utilise": pourcentage_utilise,
"taille_utilisee": taille_phys
}
return tailles_disques
def croisement_info(info_disques, taille_disques):
disques = []
for info_disque in info_disques:
vm = info_disque.get('vm')
taille_disque = taille_disques.get(info_disque.get('chemin'))
if taille_disque is None:
continue
disques.append({
"VMID": vm.get('vmid'),
"Nom": vm.get("name"),
"Type": vm.get("type", "qemu"),
"Disque": info_disque.get('nom_disque'),
"Stockage": info_disque.get('nom_stockage'),
"Taille Totale (GB)": taille_disque.get('taille_totale'),
"% Utilisé": taille_disque.get('pourcentage_utilise') / 100,
"Taille Utilisée (GB)": taille_disque.get('taille_utilisee'),
"Point de Montage": info_disque.get('point_montage'),
})
return disques
def exporter_classeur(disques, fichier_sortie):
"""Exporte les données des disques dans un fichier, en choisissant automatiquement entre XLSX et ODS."""
# Vérifie si LibreOffice est disponible sur le système
libreoffice_disponible = subprocess.run(
["which", "soffice"], capture_output=True, text=True
).returncode == 0
# Si LibreOffice est disponible, exporte en ODS, sinon en XLSX
if libreoffice_disponible:
fichier_sortie_avec_extension = ajouter_extension(fichier_sortie, ".ods")
print("LibreOffice est disponible, exportation en ODS...")
exporter_vers_ods(disques, fichier_sortie_avec_extension)
else:
fichier_sortie_avec_extension = ajouter_extension(fichier_sortie, ".xlsx")
print("LibreOffice n'est pas disponible, exportation en XLSX...")
exporter_vers_xlsx(disques, fichier_sortie_avec_extension)
return fichier_sortie_avec_extension
def exporter_vers_xlsx(disques, fichier_sortie):
"""Exporte les données des disques dans un fichier XLSX avec un filtre automatique."""
champs = ["VMID", "Nom", "Type", "Disque", "Stockage", "Taille Totale (GB)", "% Utilisé", "Taille Utilisée (GB)", "Point de Montage"]
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Disques"
ws.append(champs)
ws.freeze_panes = "B2"
# Tri croissant des disques par VMID
disques = sorted(disques, key=lambda x: x.get("VMID")) # Si VMID est absent, on lui attribue la valeur 0 par défaut
for disque in disques:
ligne = [disque.get(champ, "") for champ in champs]
ws.append(ligne)
derniere_ligne = ws.max_row
ws[f'A{derniere_ligne + 2}'] = f'=SUBTOTAL(103, A2:A{derniere_ligne})'
ws[f'H{derniere_ligne + 2}'] = f'=SUBTOTAL(109, H2:H{derniere_ligne})'
for ligne in ws.iter_rows(min_row=2, max_row=ws.max_row, max_col=len(champs)):
ligne[5].number_format = '#,##0.00 "GB"'
ligne[6].number_format = '0.00%'
ligne[7].number_format = '#,##0.00 "GB"'
ws.auto_filter.ref = f"A1:{ws.cell(row=derniere_ligne, column=len(champs)).coordinate}"
for colonne in ws.columns:
cellules = [cell for cell in colonne if cell.row <= derniere_ligne]
longueur = max(len(str(cell.value)) if cell.value is not None else 0 for cell in cellules)
largeur = max(longueur + 2, 8)
ws.column_dimensions[colonne[0].column_letter].width = largeur
wb.save(fichier_sortie)
def exporter_vers_ods(disques, fichier_sortie):
"""Exporte directement en ODS sans conserver le XLSX sur disque."""
# Crée un dossier temporaire pour le XLSX
with tempfile.TemporaryDirectory() as dossier_temp:
xlsx_temp = os.path.join(dossier_temp, "temp.xlsx")
ods_temp = os.path.join(dossier_temp, "temp.ods")
# Crée le XLSX
exporter_vers_xlsx(disques, xlsx_temp)
# Utilise LibreOffice pour convertir le fichier temporaire
commande = [
"soffice",
"--headless",
"--convert-to", "ods",
"--outdir", dossier_temp,
xlsx_temp
]
subprocess.run(commande, capture_output=True, check=True)
# Renomme le fichier généré par LibreOffice au bon nom de sortie
chemin_ods_converti = os.path.join(dossier_temp, ods_temp)
shutil.copyfile(chemin_ods_converti, fichier_sortie)
def ajouter_extension(chemin_fichier, extension):
"""Ajoute lextension souhaitée si elle est absente ou incorrecte."""
racine, ext = os.path.splitext(chemin_fichier)
ext = ext.lower()
extension = extension.lower()
# Sassure que lextension commence par un point
if not extension.startswith('.'):
extension = '.' + extension
# Si lextension est déjà correcte → ne rien changer
if ext == extension:
return chemin_fichier
# Sinon, remplace ou ajoute la bonne extension
return racine + extension
def main():
""" Récupère les informations sur les disques des VM et conteneurs de tous les nœuds Proxmox, puis exporte les données dans un fichier XLSX. """
print("Récupération des nœuds...")
debut = time.time()
noeuds = recuperer_noeuds()
fin = time.time()
print(f"Temps écoulé pour la récupération des nœuds : {fin - debut:.2f} secondes")
if not noeuds:
print("Aucun nœud trouvé.")
return
print("Récupération des VMs...")
debut = time.time()
vms = recuperer_vms(noeuds)
fin = time.time()
print(f"Temps écoulé pour la récupération des VMs : {fin - debut:.2f} secondes")
if not vms:
print("Aucune VM trouvée.")
return
print("Récupération des stockages...")
debut = time.time()
infos_stockage = recuperer_infos_stockage()
fin = time.time()
print(f"Temps écoulé pour la récupération des stockages : {fin - debut:.2f} secondes")
if not infos_stockage:
print("Aucun stockage trouvé.")
return
print("Récupération des configurations des VMs...")
debut = time.time()
configs = recuperer_configs_vms(vms)
fin = time.time()
print(f"Temps écoulé pour la récupération des configurations : {fin - debut:.2f} secondes")
print("Récupération de emplacement des disques...")
debut = time.time()
chemin_disques = recuperer_info_disques(vms, infos_stockage, configs)
fin = time.time()
print(f"Temps écoulé pour la récupération de emplacement des disques : {fin - debut:.2f} secondes")
if not chemin_disques:
print("Aucune information à exporter.")
return
print("Récupération de la taille des disques...")
debut = time.time()
taille_disques = recuperer_taille_disques(chemin_disques)
fin = time.time()
print(f"Temps écoulé pour la récupération de la taille des disques : {fin - debut:.2f} secondes")
if not taille_disques:
print("Aucune information à exporter.")
return
print("Croisement des informations...")
debut = time.time()
disques = croisement_info(chemin_disques, taille_disques)
fin = time.time()
print(f"Temps écoulé pour la récupération des disques : {fin - debut:.2f} secondes")
if not disques:
print("Aucune information à exporter.")
return
print("Exportation des données en Classeur...")
nom_du_classeur = exporter_classeur(disques, classeur)
print(f"Données exportées dans {nom_du_classeur}")
if __name__ == "__main__":
main()