VMCraft v9.2 - Pure Python VM disk image manipulation library with AI/ML intelligence and enterprise systemd support
VMCraft is hyper2kvm’s advanced disk image manipulation platform, providing comprehensive VM inspection, modification, and intelligence capabilities through a pure Python implementation.
Current Version: v9.2 (January 2026)
Statistics:
VMCraft uses a modular architecture with focused, single-responsibility components:
hyper2kvm/core/vmcraft/
├── main.py # VMCraft orchestrator class
├── _utils.py # Shared utilities
│
├── Core Infrastructure
│ ├── nbd.py # NBD device management (with retry logic)
│ ├── storage.py # LVM, LUKS, RAID, ZFS activation + LVM creation
│ ├── mount.py # Filesystem mounting (parallel + fallback)
│ ├── file_ops.py # File operations (70+ methods)
│ └── augeas_mgr.py # Augeas configuration management (v9.1)
│
├── OS Detection
│ ├── inspection.py # OS inspection orchestration
│ ├── linux_detection.py # Linux distribution detection (15+ distros)
│ └── windows_detection.py # Windows version detection (20+ versions)
│
├── Windows Support
│ ├── windows_registry.py # Registry operations
│ ├── windows_drivers.py # Driver injection
│ ├── windows_users.py # User account management
│ ├── windows_services.py # Service control
│ ├── windows_applications.py # Application detection
│ └── scheduled_tasks.py # Task Scheduler automation
│
├── Linux Support
│ ├── linux_services.py # Systemd/init service management (legacy)
│ ├── systemd_mgr.py # Core systemd service management (17 APIs)
│ ├── systemd_networkd.py # systemd-networkd configuration (12 APIs)
│ ├── systemd_journal.py # Journal log access & analysis (10 APIs)
│ └── systemd_units.py # Unit file management & analysis (13 APIs)
│
├── Enterprise Intelligence (v9.0)
│ ├── ml_analyzer.py # AI/ML analytics (7 methods)
│ ├── cloud_optimizer.py # Cloud migration planning (6 methods)
│ ├── disaster_recovery.py # DR planning (6 methods)
│ ├── audit_trail.py # Compliance logging (7 methods)
│ └── resource_orchestrator.py # Auto-scaling (7 methods)
│
└── Operational Tools
├── backup.py # Backup and restore
├── security.py # Security auditing
├── optimization.py # Disk optimization
├── advanced_analysis.py # Forensic analysis
└── export.py # VM export and packaging
VMCraft v9.2 delivers comprehensive systemd integration for enterprise Linux management, completing the transformation into a full-featured VM manipulation platform.
VMCraft now provides complete systemd integration across 4 specialized modules, enabling comprehensive Linux system management during VM migrations.
# Service control
g.systemd_service_enable("qemu-guest-agent")
g.systemd_service_start("sshd")
g.systemd_service_restart("network")
# Bulk operations
vmware_services = ["vmtoolsd", "vmware-tools", "open-vm-tools"]
g.systemd_services_disable_multiple(vmware_services)
g.systemd_services_mask(vmware_services)
# Query and analysis
failed = g.systemd_list_failed_services()
deps = g.systemd_get_service_dependencies("network.service")
active_services = g.systemd_list_services(state="active")
# Daemon management
g.systemd_daemon_reload()
# Create network configuration files
g.networkd_create_network_file(
name="10-eth0",
match={"Name": "eth0"},
network={"Address": "192.168.1.100/24", "Gateway": "192.168.1.1"},
)
# Migrate from legacy formats
g.networkd_migrate_from_ifcfg("eth0") # RHEL/Fedora ifcfg
g.networkd_migrate_from_networkmanager()
# Create bridge for KVM
g.networkd_create_bridge_network("br0", ["eth0"])
# Quick helpers
g.networkd_create_dhcp_network("eth1")
g.networkd_create_static_network("eth2", "192.168.1.50/24", "192.168.1.1")
g.networkd_enable_networkd()
# Query journal logs
logs = g.journal_get(lines=100, priority="err", since="-1h")
service_logs = g.journal_get_service("sshd", lines=50)
boot_logs = g.journal_get_since_boot(boot_offset=-1)
# Boot analysis
boots = g.journal_list_boots()
boot_id = g.journal_get_boot_id()
# Journal management
usage = g.journal_get_disk_usage()
g.journal_vacuum(size="100M") # Clean up old logs
g.journal_verify() # Check consistency
# Create custom service
g.units_create_service_unit(
name="myapp",
description="My Application",
exec_start="/usr/bin/myapp",
after=["network.target"],
restart="always"
)
# Create scheduled tasks with timers
g.units_create_timer_unit(
name="backup",
description="Daily Backup",
on_calendar="daily",
service="backup.service"
)
# Create mount units
g.units_create_mount_unit(
name="data",
what="/dev/sdb1",
where="/mnt/data",
type="ext4"
)
# Unit file management
config = g.units_read_unit_file("sshd.service")
g.units_modify_unit_file("myapp.service", "Service", "Restart", "always")
g.units_validate_unit_file("myapp.service")
# Boot performance analysis
perf = g.units_analyze_boot_performance()
chain = g.units_analyze_critical_chain()
blame = g.units_analyze_blame()
timers = g.units_list_timers()
Benefits:
VMCraft v9.1 delivered major performance improvements, enterprise features, and enhanced libguestfs API parity.
Parallel Mount Operations - 2-3x faster mounting for multi-partition VMs
# Mount multiple partitions concurrently
devices = [
("/dev/nbd0p1", "/boot"),
("/dev/nbd0p2", "/"),
("/dev/nbd0p3", "/home"),
]
results = g.mount_all_parallel(devices, max_workers=4)
Intelligent Caching - 30-40% reduction in system calls
NBD Retry Logic - 95%+ success rate on transient connection failures
Mount Fallback Strategies - Automatic recovery from damaged filesystems
# Initialize partition table
g.part_init("/dev/sda", "gpt")
# Add partition
g.part_add("/dev/sda", "primary", 2048, -1) # Start at 2048, fill to end
# Delete partition
g.part_del("/dev/sda", 1)
# Create partition table + single partition (convenience)
g.part_disk("/dev/sda", "gpt")
# Set GPT partition name
g.part_set_name("/dev/sda", 1, "EFI System")
# Set GPT partition type GUID
g.part_set_gpt_type("/dev/sda", 1, "C12A7328-F81F-11D2-BA4B-00A0C93EC93B")
# Get partition table type
parttype = g.part_get_parttype("/dev/sda") # "gpt", "msdos", or "unknown"
# Create physical volume
result = g.pvcreate(["/dev/sda1"])
# Create volume group
result = g.vgcreate("vg_data", ["/dev/sda1"])
# Create logical volume (with size)
result = g.lvcreate("lv_root", "vg_data", size_mb=10240)
# Create logical volume (with extents)
result = g.lvcreate("lv_home", "vg_data", extents="100%FREE")
# Resize logical volume
result = g.lvresize("/dev/vg_data/lv_root", 20480)
# Remove logical volume
result = g.lvremove("/dev/vg_data/lv_home", force=True)
# Remove volume group
result = g.vgremove("vg_data", force=True)
# Initialize Augeas
g.aug_init()
# Get configuration value
device = g.aug_get("/files/etc/fstab/1/spec")
# Set configuration value
g.aug_set("/files/etc/fstab/1/dump", "0")
# Save changes to disk
g.aug_save()
# Match paths by pattern
entries = g.aug_match("/files/etc/fstab/*")
# Insert new node
g.aug_insert("/files/etc/fstab/1", "01", before=True)
# Remove nodes
count = g.aug_rm("/files/etc/fstab/#comment")
# Define variable
g.aug_defvar("root", "/files/etc/fstab/*[file='/']")
# Define node variable
count, created = g.aug_defnode("tmp", "/files/etc/fstab/*[file='/tmp']", None)
# Close Augeas
g.aug_close()
# Extract tarball to guest
g.tar_in("/tmp/myapp.tar.gz", "/opt", compress="gzip")
# Pack guest directory to tarball
g.tar_out("/etc", "/tmp/etc-backup.tar.gz", compress="gzip")
# Convenience wrappers for gzip
g.tgz_in("/tmp/app.tar.gz", "/opt")
g.tgz_out("/var/log", "/tmp/logs.tar.gz")
# Get device size in bytes
size_bytes = g.blockdev_getsize64("/dev/nbd0")
# Get device size in 512-byte sectors
sectors = g.blockdev_getsz("/dev/nbd0")
# Copy data using dd
g.dd_copy("/dev/nbd0", "/tmp/disk-backup.img", count=2048, blocksize=512)
VMCraft delivers exceptional performance through native Python implementation:
| Operation | Time | Notes |
|---|---|---|
| Launch | ~1.9s | NBD connection + storage activation |
| NBD Connection | ~1.4s | Attach disk via qemu-nbd |
| OS Inspection | ~0.3s | Detect OS type, version, distribution |
| File Read | <50ms | Read file contents |
| File Write | <100ms | Write file to disk |
| Registry Read | ~150ms | Windows registry value read |
| Registry Write | ~200ms | Windows registry value write |
| Backup (tar.gz) | Depends on size | Streaming compression |
from hyper2kvm.core.vmcraft import VMCraft
# Context manager for automatic cleanup
with VMCraft() as g:
# Add disk image
g.add_drive_opts("/path/to/disk.vmdk", readonly=True, format="vmdk")
# Launch
g.launch()
# Inspect OS
roots = g.inspect_os()
for root in roots:
print(f"Type: {g.inspect_get_type(root)}")
print(f"Distro: {g.inspect_get_distro(root)}")
print(f"Product: {g.inspect_get_product_name(root)}")
# Mount filesystem
mounts = g.inspect_get_mountpoints(roots[0])
for mp, dev in mounts.items():
g.mount(dev, mp)
# Read/write files
hostname = g.cat("/etc/hostname")
g.write("/etc/motd", "Welcome to VMCraft!\n")
# Automatic cleanup on exit
with VMCraft() as g:
g.add_drive_opts("/path/to/disk.vmdk", readonly=True)
g.launch()
# Get performance metrics
metrics = g.get_performance_metrics()
print(f"Launch time: {metrics['launch_time_s']:.2f}s")
print(f"NBD connect: {metrics['nbd_connect_time_s']:.2f}s")
print(f"Inspection: {metrics['inspection_time_s']:.2f}s")
VMCraft automatically detects operating systems and versions across all major platforms.
Supported distributions:
Detection methods (priority order):
/etc/os-release (systemd standard)/etc/lsb-release (LSB standard)/etc/redhat-release, /etc/debian_version, etc.)/etc/issue (fallback)with VMCraft() as g:
g.add_drive_opts("/path/to/linux.vmdk", readonly=True)
g.launch()
roots = g.inspect_os()
for root in roots:
os_type = g.inspect_get_type(root) # "linux"
distro = g.inspect_get_distro(root) # "ubuntu", "fedora", "rhel"
major_ver = g.inspect_get_major_version(root) # 24, 40, 10
minor_ver = g.inspect_get_minor_version(root) # 04, 0, 0
product = g.inspect_get_product_name(root) # "Ubuntu 24.04 LTS"
Supported versions:
Detection methods (priority order):
with VMCraft() as g:
g.add_drive_opts("/path/to/windows.vmdk", readonly=True)
g.launch()
roots = g.inspect_os()
for root in roots:
os_type = g.inspect_get_type(root) # "windows"
product = g.inspect_get_product_name(root) # "Windows 11 Pro"
major_ver = g.inspect_get_major_version(root) # 10
minor_ver = g.inspect_get_minor_version(root) # 0
Complete file manipulation API:
# Basic operations
g.is_file("/etc/passwd") # Check if file
g.is_dir("/etc") # Check if directory
g.exists("/etc/hostname") # Check existence
# Read operations
content = g.cat("/etc/hostname") # Read entire file
lines = g.read_lines("/etc/passwd") # Read lines as list
g.download("/etc/hosts", "/tmp/hosts") # Download from VM
# Write operations
g.write("/etc/motd", "Welcome!\n") # Write file
g.upload("/tmp/file", "/etc/config") # Upload to VM
g.append("/var/log/app.log", "Entry\n") # Append to file
# Directory operations
files = g.ls("/etc") # List directory
all_files = g.find("/var/log") # Recursive find
configs = g.find_files("/etc", "*.conf") # Pattern search
# Manipulation
g.mkdir_p("/opt/myapp") # Create directory
g.cp("/etc/hosts", "/etc/hosts.bak") # Copy file
g.mv("/tmp/old", "/tmp/new") # Move/rename
g.rm_f("/tmp/tempfile") # Remove file
g.touch("/var/lock/app.lock") # Touch file
# Permissions
g.chmod(0o644, "/etc/config") # Set mode
g.chown(1000, 1000, "/home/user/file") # Set owner
g.set_permissions("/etc/shadow", owner="root", group="shadow", mode=0o640)
# Advanced
checksum = g.checksum("sha256", "/etc/passwd") # Calculate checksum
age = g.file_age("/var/log/messages") # Get file age
# List filesystems
filesystems = g.list_filesystems()
for fs, type in filesystems.items():
print(f"{fs}: {type}") # "/dev/sda1: ext4"
# List partitions
partitions = g.list_partitions()
for part in partitions:
print(part) # "/dev/sda1", "/dev/sda2"
# Get filesystem info
fs_type = g.vfs_type("/dev/sda1") # "ext4"
uuid = g.vfs_uuid("/dev/sda1") # "abc-123-..."
label = g.vfs_label("/dev/sda1") # "root"
# Mount/unmount
g.mount("/dev/sda1", "/")
g.mount("/dev/sda2", "/boot", readonly=True)
g.umount("/boot")
g.umount_all()
Comprehensive offline Windows registry access:
with VMCraft() as g:
g.add_drive_opts("/path/to/windows.vmdk", readonly=False)
g.launch()
# Read registry value
product_name = g.win_registry_read(
"SOFTWARE",
r"Microsoft\Windows NT\CurrentVersion",
"ProductName"
)
print(f"Windows: {product_name}")
# Write registry value
g.win_registry_write(
"SOFTWARE",
r"Microsoft\MyApp",
"Version",
"1.0.0"
)
# List keys
keys = g.win_registry_list_keys(
"SOFTWARE",
r"Microsoft\Windows\CurrentVersion"
)
# List values
values = g.win_registry_list_values(
"SOFTWARE",
r"Microsoft\Windows NT\CurrentVersion"
)
Supported hives:
SOFTWARE - Software configurationSYSTEM - System configurationSAM - Security Accounts Manager (future)Inject drivers into Windows DriverStore for first-boot activation:
with VMCraft() as g:
g.add_drive_opts("/path/to/windows.vmdk", readonly=False)
g.launch()
# Inject VirtIO driver
result = g.win_inject_driver("/path/to/virtio-win/viostor/w11/amd64")
if result["ok"]:
print(f"Driver injected: {result['destination']}")
print(f"Files copied: {result['files_copied']}")
for file in result['files']:
print(f" - {file}")
else:
print(f"Error: {result['error']}")
Manage Windows user accounts:
# List users
users = g.win_list_users()
for user in users:
print(f"User: {user['username']}, RID: {user['rid']}")
# Get user info
info = g.win_get_user("Administrator")
print(f"Username: {info['username']}")
print(f"Full name: {info['fullname']}")
print(f"RID: {info['rid']}")
print(f"Groups: {info['groups']}")
Control Windows services:
# List services
services = g.win_list_services()
for svc in services:
print(f"{svc['name']}: {svc['display_name']} ({svc['state']})")
# Get service info
info = g.win_get_service("wuauserv")
print(f"State: {info['state']}")
print(f"Start type: {info['start_type']}")
print(f"Description: {info['description']}")
# Set service state
g.win_set_service_state("wuauserv", "stopped")
g.win_set_service_start_type("wuauserv", "manual")
Manage scheduled tasks:
# List scheduled tasks
tasks = g.win_list_scheduled_tasks()
for task in tasks:
print(f"{task['name']}: {task['state']}")
# Get task details
details = g.win_get_scheduled_task("UpdateCheck")
print(f"Trigger: {details['trigger']}")
print(f"Action: {details['action']}")
print(f"Last run: {details['last_run']}")
Control systemd/init services:
# List services
services = g.linux_list_services()
for svc in services:
print(f"{svc['name']}: {svc['state']}")
# Get service info
info = g.linux_get_service("sshd")
print(f"Enabled: {info['enabled']}")
print(f"Active: {info['active']}")
print(f"State: {info['state']}")
# Enable/disable services
g.linux_enable_service("sshd")
g.linux_disable_service("bluetooth")
VMCraft v9.0 introduces enterprise-grade intelligence capabilities.
AI-powered anomaly detection and behavior prediction.
metrics = {
"cpu_usage": [45, 50, 48, 52, 49, 95, 50, 48],
"memory_usage": [60, 62, 61, 63, 59, 61, 60, 62],
"disk_io": [100, 105, 102, 108, 103, 106, 104, 102]
}
# Detect anomalies using z-score method
result = g.ml_detect_anomalies(metrics, "cpu_usage")
if result["anomalies_found"]:
print(f"Anomalies detected: {result['anomaly_count']}")
for anomaly in result['anomalies']:
print(f" Index {anomaly['index']}: {anomaly['value']} "
f"(z-score: {anomaly['z_score']:.2f})")
metrics = {
"cpu_usage": [75, 78, 76, 80, 77],
"memory_usage": [45, 48, 46, 49, 47],
"disk_io": [50, 52, 51, 53, 50],
"network_io": [30, 32, 31, 33, 31]
}
classification = g.ml_classify_workload(metrics)
print(f"Workload type: {classification['workload_type']}")
# "compute_intensive", "memory_intensive", "io_intensive", "balanced", "idle"
print(f"Confidence: {classification['confidence']:.1%}")
print(f"Dominant resource: {classification['dominant_resource']}")
historical_data = [
{"timestamp": "2026-01-01T00:00:00", "cpu_usage": 45, "memory_usage": 60},
{"timestamp": "2026-01-01T01:00:00", "cpu_usage": 50, "memory_usage": 62},
{"timestamp": "2026-01-01T02:00:00", "cpu_usage": 48, "memory_usage": 61},
# ... more data points
]
prediction = g.ml_predict_behavior(historical_data)
print(f"Predicted CPU: {prediction['cpu_usage']:.1f}%")
print(f"Predicted Memory: {prediction['memory_usage']:.1f}%")
print(f"Confidence: {prediction['confidence']:.1%}")
print(f"Trend: {prediction['trend']}") # "increasing", "decreasing", "stable"
# Train baseline from normal operations
training_data = [
{"cpu_usage": 45, "memory_usage": 60, "disk_io": 100},
{"cpu_usage": 50, "memory_usage": 62, "disk_io": 105},
# ... normal operation data
]
baseline = g.ml_train_baseline(training_data)
print(f"Baseline established:")
print(f" CPU: {baseline['cpu_usage']['mean']:.1f}% "
f"(±{baseline['cpu_usage']['std']:.1f})")
print(f" Memory: {baseline['memory_usage']['mean']:.1f}% "
f"(±{baseline['memory_usage']['std']:.1f})")
# Later, detect behavior changes
current_metrics = {"cpu_usage": 85, "memory_usage": 90}
change_detected = g.ml_detect_behavior_change(current_metrics)
if change_detected["behavior_changed"]:
print(f"Behavior change detected!")
print(f" Severity: {change_detected['severity']}")
for change in change_detected['changes']:
print(f" {change['metric']}: {change['deviation']:.1f} std deviations")
Multi-cloud migration planning and cost optimization.
system_info = {
"os_type": "linux",
"distro": "ubuntu",
"version": "24.04",
"cpu_cores": 8,
"memory_gb": 16,
"disk_gb": 500,
"has_cloud_init": True,
"has_ssh": True,
"open_vm_tools": False
}
readiness = g.cloud_analyze_readiness(system_info)
print(f"Cloud readiness score: {readiness['score']}/100")
print(f"Ready: {readiness['ready']}")
print(f"Confidence: {readiness['confidence']:.1%}")
print("\nChecks:")
for check in readiness['checks']:
status = "✓" if check['passed'] else "✗"
print(f" {status} {check['name']}: {check['message']}")
print("\nRecommendations:")
for rec in readiness['recommendations']:
print(f" - {rec}")
requirements = {
"cpu_cores": 8,
"memory_gb": 16,
"disk_gb": 500,
"iops": 3000,
"network_throughput_mbps": 1000
}
# AWS recommendations
aws_instances = g.cloud_recommend_instance_type(requirements, "aws")
print("AWS recommendations:")
for instance in aws_instances[:3]:
print(f" {instance['instance_type']}: "
f"{instance['cpu_cores']} cores, "
f"{instance['memory_gb']} GB RAM "
f"(fit score: {instance['fit_score']}/100)")
# Azure recommendations
azure_instances = g.cloud_recommend_instance_type(requirements, "azure")
# GCP recommendations
gcp_instances = g.cloud_recommend_instance_type(requirements, "gcp")
usage_profile = {
"hours_per_month": 730, # 24/7
"cpu_cores": 8,
"memory_gb": 16,
"storage_gb": 500,
"data_transfer_gb": 1000
}
# AWS costs
aws_costs = g.cloud_calculate_costs(usage_profile, "aws")
print(f"AWS estimated cost: ${aws_costs['total_monthly_cost']:.2f}/month")
print(f" Compute: ${aws_costs['compute_cost']:.2f}")
print(f" Storage: ${aws_costs['storage_cost']:.2f}")
print(f" Data transfer: ${aws_costs['data_transfer_cost']:.2f}")
# Multi-cloud comparison
comparison = g.cloud_compare_providers(requirements)
for provider in comparison['providers']:
print(f"{provider['cloud']}: ${provider['total_cost']:.2f}/month "
f"({provider['instance_type']})")
migration_plan = g.cloud_generate_migration_plan(system_info, "aws")
print(f"Migration plan to {migration_plan['target_cloud']}:")
print(f"Estimated duration: {migration_plan['estimated_duration']}")
print(f"Recommended instance: {migration_plan['recommended_instance']}")
print(f"Estimated cost: ${migration_plan['estimated_monthly_cost']:.2f}/month")
print("\nPhases:")
for phase in migration_plan['phases']:
print(f" {phase['phase']}: {phase['name']}")
print(f" Duration: {phase['duration']}")
for task in phase['tasks']:
print(f" - {task}")
RTO/RPO planning and DR testing.
system_info = {
"business_criticality": "high", # "critical", "high", "medium", "low"
"data_change_rate_gb_per_hour": 10,
"acceptable_data_loss_hours": 1,
"acceptable_downtime_minutes": 15
}
assessment = g.dr_assess_recovery_requirements(system_info)
print(f"Recovery Tier: {assessment['tier']}")
print(f" Description: {assessment['description']}")
print(f" Target RTO: {assessment['target_rto']}")
print(f" Target RPO: {assessment['target_rpo']}")
print("\nRecommendations:")
for rec in assessment['recommendations']:
print(f" - {rec}")
requirements = {
"tier": 1, # Tier 0-3
"data_size_gb": 500,
"change_rate_gb_per_day": 50,
"retention_days": 30
}
strategy = g.dr_create_backup_strategy(requirements)
print(f"Backup strategy for Tier {strategy['tier']}:")
print(f" Full backup frequency: {strategy['full_backup_frequency']}")
print(f" Incremental frequency: {strategy['incremental_frequency']}")
print(f" Snapshot frequency: {strategy['snapshot_frequency']}")
print(f" Retention period: {strategy['retention_period']}")
print(f" Estimated storage: {strategy['estimated_storage_gb']} GB")
print("\nBackup schedule:")
for schedule in strategy['schedule']:
print(f" {schedule['type']}: {schedule['frequency']} ({schedule['retention']})")
backup_config = {
"full_backup_frequency_hours": 24,
"incremental_frequency_hours": 6,
"snapshot_frequency_minutes": 15,
"restore_time_hours": 2,
"verification_time_hours": 0.5
}
rto_rpo = g.dr_calculate_rto_rpo(backup_config)
print(f"Achievable metrics:")
print(f" RTO (Recovery Time Objective): {rto_rpo['rto']}")
print(f" RPO (Recovery Point Objective): {rto_rpo['rpo']}")
print(f" Expected data loss: {rto_rpo['expected_data_loss']}")
print(f" Meets requirements: {rto_rpo['meets_requirements']}")
Multi-standard compliance logging.
# Log audit event
g.audit_log_event(
category="access",
action="file_read",
details={"path": "/etc/shadow", "result": "success"},
severity="INFO",
user="admin"
)
# Log critical security event
g.audit_log_event(
category="security",
action="permission_change",
details={"path": "/etc/passwd", "old_mode": "644", "new_mode": "666"},
severity="CRITICAL",
user="root"
)
# Query events
events = g.audit_query_events(
category="security",
severity="CRITICAL",
start_time="2026-01-01T00:00:00",
end_time="2026-01-31T23:59:59",
limit=100
)
for event in events:
print(f"[{event['timestamp']}] {event['severity']}: "
f"{event['category']}/{event['action']}")
print(f" User: {event['user']}")
print(f" Details: {event['details']}")
# Generate SOC2 compliance report
report = g.audit_generate_compliance_report("SOC2", period_days=90)
print(f"SOC2 Compliance Report:")
print(f" Period: {report['start_date']} to {report['end_date']}")
print(f" Total events: {report['total_events']}")
print(f" Compliance score: {report['compliance_score']}/100")
print("\nRequirements:")
for req in report['requirements']:
status = "✓" if req['met'] else "✗"
print(f" {status} {req['requirement']}: {req['evidence_count']} events")
# Other standards: PCI-DSS, HIPAA, GDPR
pci_report = g.audit_generate_compliance_report("PCI-DSS", period_days=90)
hipaa_report = g.audit_generate_compliance_report("HIPAA", period_days=90)
gdpr_report = g.audit_generate_compliance_report("GDPR", period_days=90)
# Verify audit log integrity
verification = g.audit_verify_integrity()
print(f"Audit log integrity:")
print(f" Total entries: {verification['total_entries']}")
print(f" Verified: {verification['verified_count']}")
print(f" Failed: {verification['failed_count']}")
print(f" Integrity: {verification['integrity_percentage']:.1f}%")
if verification['failed_count'] > 0:
print("\nFailed checksums:")
for entry in verification['failed_entries']:
print(f" Entry {entry['id']}: checksum mismatch")
Automated resource management and auto-scaling.
current_metrics = {
"cpu_usage_percent": 75,
"memory_usage_percent": 60,
"disk_io_mbps": 150,
"network_io_mbps": 200
}
analysis = g.orchestrate_analyze_resource_usage(current_metrics)
print(f"Resource efficiency: {analysis['efficiency_score']}/100")
print(f"Bottleneck: {analysis['bottleneck']}") # "cpu", "memory", "disk", "network", "none"
print(f"Scaling recommendation: {analysis['scaling_recommendation']}") # "scale_up", "scale_down", "maintain"
print("\nResource utilization:")
for resource, util in analysis['utilization'].items():
print(f" {resource}: {util['percentage']:.1f}% ({util['status']})")
# Create auto-scaling policy
policy = g.orchestrate_create_scaling_policy(
policy_name="production-web",
policy_type="moderate" # "aggressive", "moderate", "conservative"
)
print(f"Scaling policy: {policy['name']}")
print(f" Type: {policy['type']}")
print(f" Scale up threshold: {policy['scale_up_threshold']}%")
print(f" Scale down threshold: {policy['scale_down_threshold']}%")
print(f" Cooldown period: {policy['cooldown_seconds']}s")
# Execute scaling action
action_result = g.orchestrate_execute_scaling_action(
action="scale_up",
current_capacity=4,
reason="CPU threshold exceeded (85%)"
)
print(f"Scaling action: {action_result['action']}")
print(f" From: {action_result['old_capacity']} → {action_result['new_capacity']}")
print(f" Status: {action_result['status']}")
workloads = [
{"id": "web1", "cpu": 80, "memory": 60},
{"id": "web2", "cpu": 45, "memory": 40},
{"id": "web3", "cpu": 90, "memory": 70}
]
available_resources = {
"total_cpu_cores": 16,
"total_memory_gb": 64
}
balanced = g.orchestrate_balance_workload(workloads, available_resources)
print(f"Workload balancing plan:")
for allocation in balanced['allocations']:
print(f" {allocation['workload_id']}: "
f"{allocation['cpu_cores']} cores, "
f"{allocation['memory_gb']} GB RAM "
f"(node: {allocation['target_node']})")
print(f"\nBalance score: {balanced['balance_score']}/100")
print(f"Efficiency gain: {balanced['efficiency_gain']:.1f}%")
# Backup critical files
result = g.backup_files(
paths=["/etc", "/var/lib/app", "/home/user/.config"],
dest_archive="/tmp/backup.tar.gz",
compression="gzip" # "gzip", "bzip2", "xz"
)
print(f"Backup created: {result['archive_path']}")
print(f" Files archived: {result['files_archived']}")
print(f" Size: {result['size_bytes']} bytes")
print(f" Compression ratio: {result['compression_ratio']:.1%}")
# Restore from backup
restore_result = g.restore_files(
"/tmp/backup.tar.gz",
dest_path="/restore"
)
print(f"Files restored: {restore_result['files_extracted']}")
print(f" Restored to: {restore_result['destination']}")
# Audit file permissions
audit = g.audit_permissions("/")
print(f"Security audit results:")
print(f" World-writable files: {audit['world_writable_count']}")
print(f" Setuid files: {audit['setuid_count']}")
print(f" Setgid files: {audit['setgid_count']}")
print("\nWorld-writable files (top 10):")
for file in audit['world_writable'][:10]:
print(f" {file}")
print("\nSetuid binaries:")
for file in audit['setuid_files']:
print(f" {file}")
# Analyze disk usage
usage = g.analyze_disk_usage("/", top_n=20)
print(f"Disk usage analysis:")
print(f" Total size: {usage['total_bytes']} bytes")
print(f" Total files: {usage['file_count']}")
print(f" Total directories: {usage['directory_count']}")
print("\nLargest directories:")
for dir in usage['largest_directories']:
print(f" {dir['size_mb']:.1f} MB: {dir['path']}")
# Find large files
large_files = g.find_large_files(min_size_mb=100)
print("\nFiles > 100 MB:")
for file in large_files:
print(f" {file['size_mb']:.1f} MB: {file['path']}")
# Find duplicates
duplicates = g.find_duplicates(min_size_mb=1)
print(f"\nFound {len(duplicates)} sets of duplicate files:")
for dup_set in duplicates[:5]:
print(f" {len(dup_set['files'])} copies of {dup_set['files'][0]} "
f"({dup_set['size_mb']:.1f} MB each)")
# Forensic timeline
timeline = g.analysis_create_timeline(
"/var/log",
start_time="2026-01-01T00:00:00",
end_time="2026-01-31T23:59:59"
)
print(f"Timeline events: {timeline['event_count']}")
for event in timeline['events'][:10]:
print(f" [{event['timestamp']}] {event['type']}: {event['path']}")
# File carving (recover deleted files)
recovered = g.analysis_carve_files("/", file_types=["jpg", "png", "pdf"])
print(f"Recovered files: {recovered['count']}")
for file in recovered['files']:
print(f" {file['type']}: {file['path']} ({file['size_bytes']} bytes)")
# Export VM as OVF/OVA
export_result = g.export_vm(
"/path/to/source.vmdk",
"/output/exported.ova",
format="ova", # "ova", "ovf", "vmdk"
compress=True
)
print(f"Exported to: {export_result['output_path']}")
print(f" Format: {export_result['format']}")
print(f" Size: {export_result['size_bytes']} bytes")
print(f" Checksum: {export_result['checksum']}")
# Fedora/RHEL/CentOS
sudo dnf install -y qemu-utils util-linux lvm2 cryptsetup
# Ubuntu/Debian
sudo apt install -y qemu-utils util-linux lvm2 cryptsetup-bin
# Arch Linux
sudo pacman -S qemu-base util-linux lvm2 cryptsetup
# Fedora/RHEL/CentOS
sudo dnf install -y ntfs-3g libhivex
# Ubuntu/Debian
sudo apt install -y ntfs-3g libhivex-bin
# Arch Linux
sudo pacman -S ntfs-3g hivex
# Software RAID support
sudo dnf install mdadm # Fedora/RHEL
sudo apt install mdadm # Ubuntu/Debian
# ZFS support
sudo dnf install zfs # Fedora
sudo apt install zfsutils-linux # Ubuntu/Debian
# exFAT support
sudo dnf install exfat-utils # Fedora/RHEL
sudo apt install exfat-fuse # Ubuntu/Debian
New Modules (5):
ml_analyzer.py - AI/ML analytics (7 methods)cloud_optimizer.py - Cloud optimization (6 methods)disaster_recovery.py - DR planning (6 methods)audit_trail.py - Compliance logging (7 methods)resource_orchestrator.py - Auto-scaling (7 methods)Statistics:
New Modules (3):
scheduled_tasks.py - Windows Task Scheduler (6 methods)advanced_analysis.py - Forensic analysis (7 methods)export.py - VM export (5 methods)Statistics:
New Modules (3):
security.py - Security auditing (8 methods)optimization.py - Disk optimization (6 methods)windows_applications.py - App detection (5 methods)Statistics:
Initial modular release:
To add new functionality:
hyper2kvm/core/vmcraft/main.py initialization__init__.py if public APItests/unit/test_core/test_vmcraft/self.logger for all logging# Unit tests
pytest tests/unit/test_core/test_vmcraft/ -v
# Integration tests
pytest tests/integration/test_core/test_vmcraft/ -v
# Specific module
pytest tests/unit/test_core/test_vmcraft/test_linux_detection.py -v
# With coverage
pytest tests/unit/test_core/test_vmcraft/ --cov=hyper2kvm.core.vmcraft --cov-report=html
SPDX-License-Identifier: LGPL-3.0-or-later
Last Updated: 2026-01-25 VMCraft Version: v9.0 Maintained by: hyper2kvm project