hyper2kvm

hyper2kvm Library API

Table of Contents


Overview

hyper2kvm can be used both as a command-line tool and as a Python library. The library API provides programmatic control over VM migration, allowing you to:

Design Principles


Installation

hyper2kvm is published on PyPI and can be installed with pip:

pip install hyper2kvm

PyPI Package: https://pypi.org/project/hyper2kvm/

From Source

For development or to get the latest unreleased features:

git clone https://github.com/ssahani/hyper2kvm.git
cd hyper2kvm
pip install -e .

GitHub Repository: https://github.com/ssahani/hyper2kvm

Dependencies

The library requires the same dependencies as the CLI:

# Fedora/RHEL
sudo dnf install -y \
    python3-rich \
    python3-click \
    python3-pyyaml \
    python3-requests \
    python3-pyvmomi \
    libguestfs-tools \
    qemu-img

# Ubuntu/Debian
sudo apt install -y \
    python3-rich \
    python3-click \
    python3-yaml \
    python3-requests \
    python3-pyvmomi \
    libguestfs-tools \
    qemu-utils

Quick Start

Basic Local Conversion

from hyper2kvm import DiskProcessor

# Convert VMDK to qcow2
processor = DiskProcessor()
result = processor.process_disk(
    source_path='/data/vm.vmdk',
    output_path='/data/vm.qcow2',
    flatten=True,
    compress=True
)

print(f"Conversion complete: {result.output_path}")

Detect Guest OS

from hyper2kvm import GuestDetector

# Detect guest operating system
detector = GuestDetector()
guest = detector.detect('/mnt/guest-disk')

print(f"Detected: {guest.os_pretty}")
print(f"Type: {guest.guest_type}")

Full Migration Workflow

from hyper2kvm import Orchestrator, VMwareClient

# Connect to vSphere
client = VMwareClient(
    host='vcenter.example.com',
    user='administrator@vsphere.local',
    password='password',
    datacenter='DC1'
)

# Run migration
orchestrator = Orchestrator(vmware_client=client)
result = orchestrator.run(
    vm_name='production-web-01',
    output_dir='/var/lib/libvirt/images',
    compress=True
)

print(f"Migration complete: {result.output_path}")

API Levels

For most users, the high-level API provides everything needed:

from hyper2kvm import (
    # Main orchestrator
    Orchestrator,
    DiskProcessor,

    # Guest detection
    GuestIdentity,
    GuestDetector,
    GuestType,

    # Platform providers
    AzureSourceProvider,
    AzureConfig,
    VMwareClient,

    # Version
    __version__,
)

Use cases:

Level 2: Mid-Level API

For advanced users who need more control:

from hyper2kvm.orchestrator import (
    VsphereExporter,
)

from hyper2kvm.converters import (
    Flatten,
    Convert,
    OVF,
)

from hyper2kvm.fixers import (
    OfflineFSFix,
    NetworkFixer,
    LiveFixer,
)

from hyper2kvm.testers import (
    QemuTest,
    LibvirtTest,
)

Use cases:

Level 3: Low-Level API

For library developers and specialized workflows:

from hyper2kvm.vmware.clients import VMwareClient
from hyper2kvm.vmware.transports import VDDKTransport, HTTPTransport
from hyper2kvm.fixers.bootloader import GrubFixer
from hyper2kvm.fixers.filesystem import FstabFixer
from hyper2kvm.fixers.network import NetworkTopology

Use cases:


Usage Examples

Local VMDK Conversion

Convert a local VMDK file to qcow2 with compression:

from hyper2kvm import DiskProcessor, GuestDetector

# Initialize processor
processor = DiskProcessor()

# Optional: Detect guest OS for optimizations
detector = GuestDetector()
guest = detector.detect('/mnt/source-disk')

# Convert disk
result = processor.process_disk(
    source_path='/data/vm.vmdk',
    output_path='/data/vm.qcow2',
    flatten=True,
    compress=True,
    guest_identity=guest
)

print(f"Conversion complete!")
print(f"  Input:  {result.source_path}")
print(f"  Output: {result.output_path}")
print(f"  Size:   {result.output_size} bytes")
print(f"  Time:   {result.duration}s")

vSphere Migration

Migrate a VM from vCenter/ESXi to KVM:

from hyper2kvm import VMwareClient, Orchestrator

# Connect to vSphere
client = VMwareClient(
    host='vcenter.example.com',
    user='administrator@vsphere.local',
    password='password',
    datacenter='DC1',
    insecure=False  # Set True to skip SSL verification
)

# List available VMs
vms = client.list_vms()
print(f"Found {len(vms)} VMs")

# Export specific VM
result = client.export_vm(
    vm_name='rhel9-prod',
    output_dir='/export/vms',
    transport='vddk',
    vddk_libdir='/opt/vmware-vix-disklib-distrib'
)

print(f"Exported {result.vm_name}")
print(f"  Disks: {len(result.disks)}")
print(f"  Path:  {result.output_dir}")

Azure Migration

Migrate a VM from Azure to KVM:

from hyper2kvm import AzureSourceProvider, AzureConfig, Orchestrator

# Configure Azure source
config = AzureConfig(
    subscription_id='your-subscription-id',
    resource_group='my-rg',
    vm_name='ubuntu-vm-01',
    # Optional: Use managed identity or service principal
    tenant_id='your-tenant-id',
    client_id='your-client-id',
    client_secret='your-client-secret'
)

# Initialize provider
provider = AzureSourceProvider(config)

# Run full migration
orchestrator = Orchestrator(source_provider=provider)
result = orchestrator.run(
    output_dir='/var/lib/libvirt/images',
    compress=True,
    apply_fixes=True
)

print(f"Migration complete!")
print(f"  Source: {result.source_vm}")
print(f"  Output: {result.output_path}")
print(f"  Fixes:  {len(result.fixes_applied)}")

Guest OS Fixing

Apply offline fixes to a converted disk image:

from hyper2kvm.fixers import OfflineFSFix
from hyper2kvm import GuestDetector

# Detect guest type
detector = GuestDetector()
guest = detector.detect('/mnt/guest-disk')

print(f"Detected: {guest.os_pretty}")

# Apply fixes
fixer = OfflineFSFix(
    image_path='/var/lib/libvirt/images/vm.qcow2',
    guest_identity=guest,
    verbose=True
)

# Fix fstab (UUID-based mounting)
fstab_result = fixer.fix_fstab()
print(f"Fixed fstab: {fstab_result.changes_made} changes")

# Fix GRUB bootloader
grub_result = fixer.fix_grub()
print(f"Fixed GRUB: {grub_result.success}")

# Fix network configuration
network_result = fixer.fix_network()
print(f"Fixed network: {network_result.interfaces_fixed} interfaces")

# Regenerate initramfs
initramfs_result = fixer.regenerate_initramfs()
print(f"Regenerated initramfs: {initramfs_result.success}")

# Generate report
report = fixer.generate_report()
print(f"\nReport saved to: {report.path}")

Boot Testing

Test a migrated VM boots correctly:

from hyper2kvm.testers import QemuTest

# Test boot with QEMU
tester = QemuTest(
    image_path='/var/lib/libvirt/images/vm.qcow2',
    memory=4096,
    vcpus=2,
    uefi=True,
    timeout=120,
    headless=False  # Set True for automated testing
)

# Run boot test
result = tester.test_boot()

if result.success:
    print(f"✓ Boot successful in {result.boot_time}s")
    print(f"  Console output: {result.console_log}")
else:
    print(f"✗ Boot failed: {result.error}")
    print(f"  Last output: {result.last_console_lines}")

Custom Workflows

Build a custom migration pipeline:

from hyper2kvm import VMwareClient, GuestDetector
from hyper2kvm.converters import Flatten, Convert
from hyper2kvm.fixers import OfflineFSFix
from hyper2kvm.testers import LibvirtTest
import logging

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def migrate_vm(vcenter_host, vm_name, output_dir):
    """Custom migration workflow."""

    # Step 1: Export from vSphere
    logger.info(f"Exporting {vm_name} from vCenter...")
    client = VMwareClient(host=vcenter_host, user='admin', password='pass')
    export_result = client.export_vm(vm_name, output_dir='/tmp/export')

    # Step 2: Flatten VMDK
    logger.info("Flattening VMDK...")
    flattener = Flatten()
    flat_vmdk = flattener.flatten(
        source=export_result.disks[0],
        output=f'/tmp/{vm_name}-flat.vmdk'
    )

    # Step 3: Convert to qcow2
    logger.info("Converting to qcow2...")
    converter = Convert()
    qcow2_path = f'{output_dir}/{vm_name}.qcow2'
    converter.convert(
        source=flat_vmdk,
        output=qcow2_path,
        format='qcow2',
        compress=True
    )

    # Step 4: Detect guest and apply fixes
    logger.info("Detecting guest OS...")
    detector = GuestDetector()
    guest = detector.detect_from_image(qcow2_path)

    logger.info(f"Applying fixes for {guest.os_pretty}...")
    fixer = OfflineFSFix(image_path=qcow2_path, guest_identity=guest)
    fixer.fix_fstab()
    fixer.fix_grub()
    fixer.fix_network()

    # Step 5: Boot test
    logger.info("Testing boot...")
    tester = LibvirtTest(
        image_path=qcow2_path,
        memory=4096,
        vcpus=2,
        uefi=(guest.firmware == 'uefi')
    )
    test_result = tester.test_boot()

    if not test_result.success:
        raise RuntimeError(f"Boot test failed: {test_result.error}")

    logger.info(f"Migration complete: {qcow2_path}")
    return qcow2_path

# Run migration
result = migrate_vm(
    vcenter_host='vcenter.example.com',
    vm_name='production-web-01',
    output_dir='/var/lib/libvirt/images'
)

API Reference

Orchestration

Orchestrator

High-level orchestrator for complete VM migrations.

class Orchestrator:
    def __init__(
        self,
        source_provider=None,
        vmware_client=None,
        azure_provider=None,
        config=None
    ):
        """Initialize orchestrator with source provider."""
        ...

    def run(
        self,
        vm_name=None,
        output_dir=None,
        compress=True,
        apply_fixes=True,
        test_boot=False
    ):
        """
        Run complete migration workflow.

        Returns:
            MigrationResult with details of the migration
        """
        ...

DiskProcessor

Process individual disk images.

class DiskProcessor:
    def process_disk(
        self,
        source_path: str,
        output_path: str,
        flatten: bool = False,
        compress: bool = False,
        guest_identity=None
    ):
        """
        Convert and process a disk image.

        Args:
            source_path: Input disk path (VMDK, VHD, etc.)
            output_path: Output qcow2 path
            flatten: Flatten multi-part VMDKs
            compress: Compress output image
            guest_identity: Optional GuestIdentity for optimizations

        Returns:
            ProcessResult with conversion details
        """
        ...

Guest Detection

GuestDetector

Detect guest operating system from disk images.

class GuestDetector:
    def detect(self, mount_point: str) -> GuestIdentity:
        """Detect guest OS from mounted filesystem."""
        ...

    def detect_from_image(self, image_path: str) -> GuestIdentity:
        """Detect guest OS from disk image (auto-mounts)."""
        ...

GuestIdentity

Information about detected guest OS.

@dataclass
class GuestIdentity:
    guest_type: GuestType
    os_id: str
    os_version: str
    os_pretty: str
    architecture: str
    firmware: str  # 'bios' or 'uefi'
    init_system: str  # 'systemd', 'upstart', 'sysv'
    package_manager: str  # 'dnf', 'apt', 'zypper', etc.

GuestType

Enum of supported guest OS types.

class GuestType(Enum):
    LINUX = "linux"
    WINDOWS = "windows"
    UNKNOWN = "unknown"

Platform Providers

VMwareClient

vSphere/vCenter client for VM export.

class VMwareClient:
    def __init__(
        self,
        host: str,
        user: str,
        password: str,
        datacenter: str = None,
        insecure: bool = False
    ):
        """Connect to vCenter/ESXi."""
        ...

    def list_vms(self) -> List[str]:
        """List all VM names."""
        ...

    def export_vm(
        self,
        vm_name: str,
        output_dir: str,
        transport: str = 'vddk',
        vddk_libdir: str = None
    ):
        """Export VM to local disk."""
        ...

AzureSourceProvider

Azure VM migration provider.

class AzureSourceProvider:
    def __init__(self, config: AzureConfig):
        """Initialize with Azure configuration."""
        ...

    def download_vm(self, output_dir: str):
        """Download VM disks from Azure."""
        ...

AzureConfig

Azure connection configuration.

@dataclass
class AzureConfig:
    subscription_id: str
    resource_group: str
    vm_name: str
    tenant_id: str = None
    client_id: str = None
    client_secret: str = None

Converters

Flatten

Flatten multi-part VMDK files.

class Flatten:
    def flatten(self, source: str, output: str) -> str:
        """Flatten VMDK snapshot chain."""
        ...

Convert

Convert between disk formats.

class Convert:
    def convert(
        self,
        source: str,
        output: str,
        format: str = 'qcow2',
        compress: bool = False
    ) -> str:
        """Convert disk format using qemu-img."""
        ...

OVF

Extract VMs from OVF/OVA packages.

class OVF:
    def extract(self, source: str, output_dir: str):
        """Extract OVF/OVA package."""
        ...

Fixers

OfflineFSFix

Offline guest OS fixer (libguestfs-based).

class OfflineFSFix:
    def __init__(
        self,
        image_path: str,
        guest_identity: GuestIdentity,
        verbose: bool = False
    ):
        """Initialize offline fixer."""
        ...

    def fix_fstab(self) -> FixResult:
        """Fix /etc/fstab for KVM."""
        ...

    def fix_grub(self) -> FixResult:
        """Fix GRUB bootloader configuration."""
        ...

    def fix_network(self) -> FixResult:
        """Fix network interface configuration."""
        ...

    def regenerate_initramfs(self) -> FixResult:
        """Regenerate initramfs with virtio drivers."""
        ...

    def remove_vmware_tools(self) -> FixResult:
        """Remove VMware Tools."""
        ...

    def generate_report(self) -> Report:
        """Generate migration report."""
        ...

NetworkFixer

Advanced network configuration fixer.

class NetworkFixer:
    def fix(self, guest: guestfs.GuestFS) -> FixResult:
        """Fix network configuration for KVM."""
        ...

LiveFixer

Online fixer (for running VMs).

class LiveFixer:
    def __init__(self, ssh_host: str, ssh_user: str, ssh_key: str):
        """Initialize live fixer with SSH access."""
        ...

    def apply_fixes(self) -> FixResult:
        """Apply fixes to running VM."""
        ...

Testers

QemuTest

QEMU-based boot testing.

class QemuTest:
    def __init__(
        self,
        image_path: str,
        memory: int = 2048,
        vcpus: int = 1,
        uefi: bool = False,
        timeout: int = 120,
        headless: bool = True
    ):
        """Initialize QEMU tester."""
        ...

    def test_boot(self) -> TestResult:
        """Test if VM boots successfully."""
        ...

LibvirtTest

Libvirt-based boot testing.

class LibvirtTest:
    def __init__(
        self,
        image_path: str,
        memory: int = 2048,
        vcpus: int = 1,
        uefi: bool = False,
        timeout: int = 120
    ):
        """Initialize libvirt tester."""
        ...

    def test_boot(self) -> TestResult:
        """Test boot using libvirt."""
        ...

Error Handling

All library functions raise exceptions on errors. Use standard Python exception handling:

from hyper2kvm import VMwareClient, Orchestrator
from hyper2kvm.exceptions import (
    ConnectionError,
    ConversionError,
    GuestDetectionError,
    BootTestError
)

try:
    client = VMwareClient(host='vcenter.example.com', ...)
    orchestrator = Orchestrator(vmware_client=client)
    result = orchestrator.run(vm_name='prod-vm')

except ConnectionError as e:
    print(f"Failed to connect to vCenter: {e}")

except ConversionError as e:
    print(f"Disk conversion failed: {e}")
    print(f"  Source: {e.source_path}")
    print(f"  Output: {e.output_path}")

except GuestDetectionError as e:
    print(f"Could not detect guest OS: {e}")

except BootTestError as e:
    print(f"Boot test failed: {e}")
    print(f"  Console: {e.console_output}")

except Exception as e:
    print(f"Unexpected error: {e}")

Common Exceptions


Best Practices

1. Always Detect Guest OS

Guest detection enables optimizations and proper fixes:

# Good
detector = GuestDetector()
guest = detector.detect_from_image(image_path)
fixer = OfflineFSFix(image_path, guest_identity=guest)

# Less optimal
fixer = OfflineFSFix(image_path)  # Will auto-detect, but slower

2. Use Context Managers for Cleanup

from hyper2kvm import VMwareClient

with VMwareClient(host='vcenter.example.com', ...) as client:
    result = client.export_vm('vm-name', '/output')
    # Connection automatically closed

3. Enable Verbose Logging for Debugging

import logging

logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Now all hyper2kvm operations will log details

4. Test Boots Before Production

from hyper2kvm.testers import QemuTest

# Always test boot after migration
tester = QemuTest(image_path=result.output_path, timeout=180)
test_result = tester.test_boot()

if not test_result.success:
    raise RuntimeError(f"Boot test failed: {test_result.error}")

5. Generate Migration Reports

# Generate detailed report of all changes
fixer = OfflineFSFix(image_path, guest_identity=guest)
fixer.fix_fstab()
fixer.fix_grub()
fixer.fix_network()

report = fixer.generate_report()
print(f"Report saved to: {report.path}")

6. Handle Partial Failures Gracefully

fixes_applied = []
fixes_failed = []

for fix_name, fix_func in [
    ('fstab', fixer.fix_fstab),
    ('grub', fixer.fix_grub),
    ('network', fixer.fix_network),
]:
    try:
        result = fix_func()
        if result.success:
            fixes_applied.append(fix_name)
        else:
            fixes_failed.append((fix_name, result.error))
    except Exception as e:
        fixes_failed.append((fix_name, str(e)))

print(f"Applied {len(fixes_applied)} fixes")
print(f"Failed {len(fixes_failed)} fixes")

Migration from CLI to Library

If you’re using the CLI and want to migrate to the library, here’s how:

CLI Command

sudo python -m hyper2kvm vsphere \
    --vcenter vcenter.example.com \
    --user administrator@vsphere.local \
    --password-env VC_PASSWORD \
    --datacenter DC1 \
    --vm rhel9-prod \
    --output-dir /var/lib/libvirt/images \
    --compress

Equivalent Library Code

import os
from hyper2kvm import VMwareClient, Orchestrator

# Get password from environment
password = os.environ['VC_PASSWORD']

# Connect to vSphere
client = VMwareClient(
    host='vcenter.example.com',
    user='administrator@vsphere.local',
    password=password,
    datacenter='DC1'
)

# Run migration
orchestrator = Orchestrator(vmware_client=client)
result = orchestrator.run(
    vm_name='rhel9-prod',
    output_dir='/var/lib/libvirt/images',
    compress=True
)

print(f"Migration complete: {result.output_path}")

Batch Operations

CLI:

for vm in vm1 vm2 vm3; do
    sudo python -m hyper2kvm vsphere --vm $vm ...
done

Library:

from hyper2kvm import VMwareClient, Orchestrator

client = VMwareClient(...)
orchestrator = Orchestrator(vmware_client=client)

for vm_name in ['vm1', 'vm2', 'vm3']:
    try:
        result = orchestrator.run(vm_name=vm_name, ...)
        print(f"✓ Migrated {vm_name}")
    except Exception as e:
        print(f"✗ Failed {vm_name}: {e}")

Batch Migration APIs

hyper2kvm provides enterprise-grade batch migration capabilities with comprehensive APIs for batch processing, automation, and integration.

Batch Orchestration

Convert multiple VMs in parallel with centralized orchestration and progress tracking.

BatchOrchestrator

Import:

from hyper2kvm.manifest.batch_orchestrator import BatchOrchestrator

Basic usage:

from pathlib import Path

# Create batch orchestrator
orchestrator = BatchOrchestrator(
    batch_manifest_path="/path/to/batch.json",
    logger=my_logger,
    enable_checkpoint=True,
    enable_progress=True
)

# Run batch conversion
result = orchestrator.run()

# Check results
print(f"Total VMs: {result['total_vms']}")
print(f"Successful: {result['successful']}")
print(f"Failed: {result['failed']}")

With custom parallel limit:

# Batch manifest controls parallelism
{
  "batch_version": "1.0",
  "batch_metadata": {
    "batch_id": "my-migration",
    "parallel_limit": 8,  # Run 8 VMs concurrently
    "continue_on_error": true
  },
  "vms": [...]
}

Resume from checkpoint:

from hyper2kvm.manifest.checkpoint_manager import CheckpointManager

# Load existing checkpoint
checkpoint_file = Path("/output/.hyper2kvm-checkpoint-batch-id.json")
checkpoint = CheckpointManager.load_checkpoint(checkpoint_file)

if checkpoint:
    print(f"Resuming from VM {checkpoint['resume_from']}")
    print(f"Already completed: {len(checkpoint['completed_vms'])}")

# Orchestrator automatically resumes if checkpoint exists
orchestrator = BatchOrchestrator(
    batch_manifest_path="/path/to/batch.json",
    enable_checkpoint=True
)
result = orchestrator.run()  # Skips completed VMs

Key methods:


Migration Profiles

Reusable configuration templates with inheritance support.

ProfileLoader

Import:

from hyper2kvm.profiles.profile_loader import ProfileLoader

Load built-in profile:

loader = ProfileLoader()

# Load production profile
profile = loader.load_profile("production")
print(f"Pipeline config: {profile['pipeline']}")

# List available profiles
built_in_profiles = [
    "production",  # Full conversion with validation
    "testing",     # Fast conversion, skip validation
    "minimal",     # Extract and fix only
    "fast",        # Minimal fixes, no compression
    "windows",     # Windows-specific
    "archive",     # Maximum compression
    "debug"        # Verbose logging
]

Load custom profile:

from pathlib import Path

loader = ProfileLoader()

# Load custom profile from directory
custom_profile_dir = Path("/etc/hyper2kvm/profiles")
profile = loader.load_profile(
    "my-custom-profile",
    custom_profile_path=custom_profile_dir / "my-custom-profile.yaml"
)

Profile with inheritance:

# custom-profile.yaml
extends: production

pipeline:
  convert:
    compress_level: 9  # Override just compression

output:
  format: qcow2

Programmatic profile creation:

# Create profile dict
custom_profile = {
    "pipeline": {
        "fix": {
            "enabled": True,
            "regen_initramfs": True
        },
        "convert": {
            "enabled": True,
            "compress": True
        }
    },
    "output": {
        "format": "qcow2"
    }
}

# Merge with built-in profile
from hyper2kvm.config.config import Config
base_profile = loader.load_profile("production")
merged = Config.merge_dicts(base_profile, custom_profile)

Profile caching (Phase 7.3):

from hyper2kvm.profiles.profile_cache import get_global_cache

# Get cache statistics
cache = get_global_cache()
stats = cache.get_stats()
print(f"Cache hits: {stats['hits']}, misses: {stats['misses']}")

# Clear cache
cache.clear()

Hook System

Execute custom scripts, Python functions, or HTTP webhooks at pipeline stages.

HookRunner

Import:

from hyper2kvm.hooks.hook_runner import HookRunner

Create from manifest:

# Manifest with hooks
manifest = {
    "hooks": {
        "post_convert": [
            {
                "type": "script",
                "path": "/scripts/notify.sh",
                "env": {"VM_NAME": ""},
                "timeout": 300
            }
        ]
    }
}

# Create hook runner
runner = HookRunner.from_manifest(manifest, logger=my_logger)

# Execute hooks at specific stage
context = {
    "vm_name": "my-vm",
    "output_path": "/converted/my-vm.qcow2"
}
success = runner.execute_stage_hooks("post_convert", context)

Script hook:

from hyper2kvm.hooks.hook_types import ScriptHook

hook = ScriptHook(
    path="/scripts/backup.sh",
    env={"SOURCE": "", "DEST": "/backup"},
    args=["--verbose"],
    timeout=600,
    continue_on_error=False,
    logger=my_logger
)

# Execute hook
context = {"source_path": "/vmdk/disk.vmdk"}
result = hook.execute(context)
print(f"Exit code: {result['exit_code']}")

Python hook:

from hyper2kvm.hooks.hook_types import PythonHook

hook = PythonHook(
    module="my_validators",
    function="check_disk_size",
    args={"disk_path": "", "min_size_gb": 10},
    timeout=300,
    logger=my_logger
)

result = hook.execute({"output_path": "/converted/disk.qcow2"})

HTTP hook with retry (Phase 7.2):

from hyper2kvm.hooks.hook_types import HttpHook

hook = HttpHook(
    url="https://api.example.com/notify",
    method="POST",
    headers={"Content-Type": "application/json"},
    body={"vm": "", "status": "completed"},
    timeout=30,
    max_retries=3,
    retry_delay=5,
    retry_strategy="exponential",  # exponential, linear, constant
    max_delay=60,
    logger=my_logger
)

result = hook.execute({"vm_name": "my-vm"})

Available stages:


Libvirt Integration

Automatic domain creation and storage pool management.

LibvirtManager

Import:

from hyper2kvm.libvirt.libvirt_manager import LibvirtManager

Define domain from XML:

from pathlib import Path

manager = LibvirtManager(logger=my_logger)

# Define domain
xml_path = Path("/output/my-vm.xml")
domain_name = manager.define_domain(
    xml_path=xml_path,
    overwrite=False  # Fail if domain already exists
)
print(f"Defined domain: {domain_name}")

# Auto-start domain (boot immediately)
manager.auto_start_domain(domain_name)

# Set autostart on host boot
manager.set_autostart(domain_name, enabled=True)

Create snapshot:

snapshot_name = manager.create_snapshot(
    domain_name="my-vm",
    snapshot_name="pre-first-boot",
    description="Snapshot before first boot"
)

PoolManager

Import:

from hyper2kvm.libvirt.pool_manager import PoolManager

Import disk to pool:

pool_mgr = PoolManager(logger=my_logger)

# Import disk to storage pool
volume_name = pool_mgr.import_disk_to_pool(
    disk_path=Path("/converted/my-vm.qcow2"),
    pool_name="vms",
    volume_name="my-vm-disk",
    overwrite=False
)
print(f"Imported volume: {volume_name}")

Complete libvirt workflow:

from hyper2kvm.libvirt import LibvirtManager, PoolManager

# Import disk
pool_mgr = PoolManager()
volume = pool_mgr.import_disk_to_pool(
    disk_path=Path("/converted/vm.qcow2"),
    pool_name="production-vms",
    volume_name="vm-disk"
)

# Define domain
libvirt_mgr = LibvirtManager()
domain = libvirt_mgr.define_domain(
    xml_path=Path("/converted/vm.xml")
)

# Create pre-boot snapshot
libvirt_mgr.create_snapshot(domain, "pre-first-boot")

# Start VM
libvirt_mgr.auto_start_domain(domain)

Validation Framework

Extensible validation with multiple severity levels.

Validators

Import:

from hyper2kvm.validation import (
    DiskValidator,
    XMLValidator,
    ValidationRunner,
    ValidationSeverity
)

Disk validation:

validator = DiskValidator()

context = {
    "output_path": "/converted/disk.qcow2",
    "format": "qcow2",
    "minimum_size": 1 * 1024 * 1024 * 1024  # 1GB
}

report = validator.validate(context)

# Check results
if report.has_errors():
    print("Validation failed!")
    for error in report.get_issues_by_severity(ValidationSeverity.ERROR):
        print(f"  ERROR: {error.message}")
        for suggestion in error.suggestions:
            print(f"    Suggestion: {suggestion}")

XML validation:

validator = XMLValidator()

report = validator.validate({"xml_path": "/output/domain.xml"})

print(f"Passed: {report.passed_checks}/{report.total_checks}")

Multiple validators:

runner = ValidationRunner()
runner.add_validator(DiskValidator())
runner.add_validator(XMLValidator())

# Run all validators
context = {
    "output_path": "/converted/disk.qcow2",
    "format": "qcow2",
    "xml_path": "/output/domain.xml"
}

reports = runner.run_all(context)

# Aggregate summary
summary = runner.get_aggregate_summary(reports)
print(f"Total validators: {summary['total_validators']}")
print(f"Total checks: {summary['total_checks']}")
print(f"All passed: {not summary['has_errors']}")

Custom validator:

from hyper2kvm.validation import BaseValidator, ValidationSeverity

class NetworkValidator(BaseValidator):
    def validate(self, context):
        import time
        start_time = time.time()

        network_count = context.get("network_count", 0)

        if network_count > 0:
            self._add_result(
                check_name="has_networks",
                passed=True,
                severity=ValidationSeverity.INFO,
                message=f"Domain has {network_count} network(s)"
            )
        else:
            self._add_result(
                check_name="has_networks",
                passed=False,
                severity=ValidationSeverity.WARNING,
                message="Domain has no networks",
                suggestions=["Add at least one network interface"]
            )

        self.report.duration = time.time() - start_time
        return self.report

# Use custom validator
validator = NetworkValidator()
report = validator.validate({"network_count": 2})

Progress Tracking

Real-time progress persistence for monitoring long-running conversions.

ProgressTracker

Import:

from hyper2kvm.manifest.batch_progress import (
    ProgressTracker,
    VMStatus,
    BatchProgress
)

Track conversion progress:

from pathlib import Path

# Create progress tracker
tracker = ProgressTracker(
    progress_file=Path("/output/progress.json"),
    batch_id="my-batch",
    total_vms=10,
    logger=my_logger
)

# Track VM lifecycle
tracker.start_vm("vm1")
tracker.update_vm_stage("vm1", "extraction")
tracker.update_vm_stage("vm1", "fix")
tracker.update_vm_stage("vm1", "convert")
tracker.complete_vm("vm1", success=True)

# Get current progress
progress = tracker.get_progress()
print(f"Completion: {progress.get_completion_percentage()}%")
print(f"Estimated time remaining: {progress.get_estimated_time_remaining()}s")

# Complete batch
tracker.complete_batch()
tracker.cleanup()  # Remove progress file

Monitor external progress:

# Load progress from file (for monitoring tools)
progress = ProgressTracker.load_progress(
    Path("/output/.hyper2kvm-batch-progress-batch-id.json")
)

if progress:
    counts = progress.get_counts()
    print(f"Completed: {counts['completed']}")
    print(f"In progress: {counts['in_progress']}")
    print(f"Failed: {counts['failed']}")
    print(f"Pending: {counts['pending']}")

    # Get per-VM details
    for vm_id, vm_progress in progress.vms.items():
        print(f"{vm_id}: {vm_progress.status.value}")
        if vm_progress.status == VMStatus.COMPLETED:
            print(f"  Duration: {vm_progress.duration}s")

Progress JSON format:

{
  "batch_id": "my-batch",
  "total_vms": 10,
  "counts": {
    "pending": 3,
    "in_progress": 2,
    "completed": 4,
    "failed": 1,
    "skipped": 0
  },
  "completion_percentage": 50.0,
  "estimated_time_remaining": 450.0,
  "vms": {
    "vm1": {
      "vm_id": "vm1",
      "status": "completed",
      "started_at": 1737558000.0,
      "completed_at": 1737558050.0,
      "duration": 50.0,
      "stages_completed": ["extraction", "fix", "convert"]
    }
  }
}

Next Steps


Status: Library API fully implemented and documented