Version: 1.0
Status: Production-Ready
OpenAPI Spec: Available at /openapi.json
The hyper2kvm Worker REST API provides a production-grade HTTP interface for VM migration job orchestration. Built with FastAPI, it offers automatic OpenAPI/Swagger documentation, type-safe request validation, and real-time progress streaming via Server-Sent Events (SSE).
/docs# Install API dependencies
pip install -r requirements-api.txt
# Or install individual packages
pip install fastapi uvicorn sse-starlette
# Development mode (auto-reload)
uvicorn hyper2kvm.worker.api:app --reload --host 0.0.0.0 --port 8000
# Production mode (with Gunicorn)
gunicorn hyper2kvm.worker.api:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
# Using Docker
docker run -p 8000:8000 ghcr.io/ssahani/hyper2kvm-worker-api:latest
http://localhost:8000
All endpoints return JSON except /metrics (Prometheus text format) and /jobs/{id}/events/stream (Server-Sent Events).
POST /jobs
Submit a new VM migration job.
Query Parameters:
queue (boolean, optional) - If true, add to job queue for worker assignmentRequest Body:
{
"job_id": "convert-vm-123",
"operation": "convert",
"image": {
"path": "/data/vm.vmdk",
"format": "vmdk"
},
"parameters": {
"output_format": "qcow2",
"compress": true
},
"execution_policy": {
"timeout_seconds": 3600,
"retry_count": 3,
"priority": 75,
"idempotent": true
},
"audit_info": {
"requested_by": "admin@example.com",
"ticket_id": "TICKET-456",
"tags": ["production", "critical"]
}
}
Response (201 Created):
{
"job_id": "convert-vm-123",
"state": "QUEUED",
"message": "Job queued successfully",
"queue_position": 5
}
cURL Example:
curl -X POST http://localhost:8000/jobs \
-H "Content-Type: application/json" \
-d @job_spec.json
GET /jobs/{job_id}
Retrieve current job status and history.
Response (200 OK):
{
"job_id": "convert-vm-123",
"spec": { /* JobSpec object */ },
"state": "RUNNING",
"state_history": [
{
"state": "CREATED",
"timestamp": "2026-01-31T10:00:00Z",
"reason": "Job submitted via REST API"
},
{
"state": "VALIDATED",
"timestamp": "2026-01-31T10:00:01Z",
"reason": "Job spec validated successfully"
},
{
"state": "QUEUED",
"timestamp": "2026-01-31T10:00:02Z",
"reason": "Added to queue with priority 75"
},
{
"state": "ASSIGNED",
"timestamp": "2026-01-31T10:01:00Z",
"reason": "Assigned to worker worker-1"
},
{
"state": "RUNNING",
"timestamp": "2026-01-31T10:01:05Z",
"reason": "Worker started execution"
}
],
"result": null,
"latest_event": {
"job_id": "convert-vm-123",
"timestamp": "2026-01-31T10:02:30Z",
"phase": "conversion",
"percentage": 45,
"message": "Converting disk format: 45% complete",
"details": {}
}
}
cURL Example:
curl http://localhost:8000/jobs/convert-vm-123
GET /jobs
List all jobs with optional filtering.
Query Parameters:
state (JobState, optional) - Filter by job statelimit (integer, optional, default=100, max=1000) - Maximum jobs to returnResponse (200 OK):
{
"total": 3,
"jobs": [
{
"job_id": "convert-vm-123",
"state": "RUNNING",
"created_at": "2026-01-31T10:00:00Z",
"is_terminal": false
},
{
"job_id": "convert-vm-124",
"state": "COMPLETED",
"created_at": "2026-01-31T09:00:00Z",
"is_terminal": true
},
{
"job_id": "convert-vm-125",
"state": "QUEUED",
"created_at": "2026-01-31T10:05:00Z",
"is_terminal": false
}
]
}
cURL Examples:
# List all jobs
curl http://localhost:8000/jobs
# List only running jobs
curl "http://localhost:8000/jobs?state=RUNNING"
# List with limit
curl "http://localhost:8000/jobs?limit=10"
DELETE /jobs/{job_id}
Cancel a running or queued job.
Response (200 OK):
{
"job_id": "convert-vm-123",
"state": "CANCELLED",
"message": "Job cancelled successfully"
}
Error Response (400 Bad Request):
{
"error": {
"code": 400,
"message": "Job convert-vm-123 is already in terminal state COMPLETED",
"path": "/jobs/convert-vm-123",
"timestamp": "2026-01-31T10:15:00Z"
}
}
cURL Example:
curl -X DELETE http://localhost:8000/jobs/convert-vm-123
GET /jobs/{job_id}/events
Retrieve all progress events for a job (polling mode).
Query Parameters:
since (ISO 8601 timestamp, optional) - Only return events after this timeResponse (200 OK):
[
{
"job_id": "convert-vm-123",
"timestamp": "2026-01-31T10:01:05Z",
"phase": "initialization",
"percentage": 0,
"message": "Starting job execution",
"details": {}
},
{
"job_id": "convert-vm-123",
"timestamp": "2026-01-31T10:01:30Z",
"phase": "conversion",
"percentage": 25,
"message": "Converting disk format: 25% complete",
"details": {
"bytes_processed": "536870912",
"total_bytes": "2147483648"
}
},
{
"job_id": "convert-vm-123",
"timestamp": "2026-01-31T10:02:30Z",
"phase": "conversion",
"percentage": 50,
"message": "Converting disk format: 50% complete",
"details": {
"bytes_processed": "1073741824",
"total_bytes": "2147483648"
}
}
]
cURL Examples:
# Get all events
curl http://localhost:8000/jobs/convert-vm-123/events
# Get events since timestamp
curl "http://localhost:8000/jobs/convert-vm-123/events?since=2026-01-31T10:02:00Z"
GET /jobs/{job_id}/events/stream
Stream job progress events in real-time using Server-Sent Events (SSE).
Response: Server-Sent Events (text/event-stream)
Event Types:
progress - Progress update eventcomplete - Job reached terminal stateerror - Error occurredExample SSE Stream:
event: progress
data: {"job_id":"convert-vm-123","timestamp":"2026-01-31T10:01:05Z","phase":"initialization","percentage":0,"message":"Starting job execution","details":{}}
event: progress
data: {"job_id":"convert-vm-123","timestamp":"2026-01-31T10:01:30Z","phase":"conversion","percentage":25,"message":"Converting disk format: 25% complete","details":{"bytes_processed":"536870912"}}
event: progress
data: {"job_id":"convert-vm-123","timestamp":"2026-01-31T10:02:30Z","phase":"conversion","percentage":50,"message":"Converting disk format: 50% complete","details":{"bytes_processed":"1073741824"}}
event: complete
data: {"state":"COMPLETED"}
cURL Example:
# Stream events to terminal
curl -N http://localhost:8000/jobs/convert-vm-123/events/stream
JavaScript EventSource Example:
const source = new EventSource('http://localhost:8000/jobs/convert-vm-123/events/stream');
source.addEventListener('progress', (event) => {
const progress = JSON.parse(event.data);
console.log(`Progress: ${progress.percentage}% - ${progress.message}`);
});
source.addEventListener('complete', (event) => {
const result = JSON.parse(event.data);
console.log(`Job completed with state: ${result.state}`);
source.close();
});
source.addEventListener('error', (event) => {
console.error('Stream error:', event);
source.close();
});
Python Example:
import httpx
async with httpx.AsyncClient() as client:
async with client.stream('GET', 'http://localhost:8000/jobs/convert-vm-123/events/stream') as response:
async for line in response.aiter_lines():
if line.startswith('data: '):
event_data = json.loads(line[6:])
print(f"Progress: {event_data['percentage']}%")
POST /workers/register
Register a new worker with its capabilities.
Request Body:
{
"worker_id": "worker-1",
"capabilities": [
"nbd_access",
"lvm_tools",
"mount_operations",
"qemu_img"
],
"system_info": {
"hostname": "worker-node-1",
"os": "Linux",
"kernel": "6.1.0",
"architecture": "x86_64",
"memory_mb": 16384,
"disk_space_mb": 512000
},
"execution_mode": "PRIVILEGED_CONTAINER"
}
Response (201 Created):
{
"worker_id": "worker-1",
"message": "Worker registered successfully",
"capabilities": [
"nbd_access",
"lvm_tools",
"mount_operations",
"qemu_img"
],
"registered_at": "2026-01-31T09:00:00Z"
}
cURL Example:
curl -X POST http://localhost:8000/workers/register \
-H "Content-Type: application/json" \
-d @worker_capabilities.json
POST /workers/{worker_id}/heartbeat
Update worker heartbeat to indicate it’s still alive.
Response (200 OK):
{
"worker_id": "worker-1",
"message": "Heartbeat received",
"timestamp": "2026-01-31T10:00:00Z"
}
cURL Example:
curl -X POST http://localhost:8000/workers/worker-1/heartbeat
GET /workers
List all registered workers.
Response (200 OK):
{
"total": 3,
"active": 2,
"workers": [
{
"worker_id": "worker-1",
"capabilities": ["nbd_access", "lvm_tools", "mount_operations", "qemu_img"],
"system_info": {
"hostname": "worker-node-1",
"memory_mb": 16384,
"disk_space_mb": 512000
}
},
{
"worker_id": "worker-2",
"capabilities": ["qemu_img"],
"system_info": {
"hostname": "worker-node-2",
"memory_mb": 8192,
"disk_space_mb": 256000
}
}
]
}
cURL Example:
curl http://localhost:8000/workers
DELETE /workers/{worker_id}
Unregister a worker.
Response (200 OK):
{
"worker_id": "worker-1",
"message": "Worker unregistered successfully"
}
cURL Example:
curl -X DELETE http://localhost:8000/workers/worker-1
GET /queue
Get job queue status and statistics.
Response (200 OK):
{
"total_jobs": 12,
"by_priority": {
"10": 1,
"50": 4,
"75": 5,
"90": 2
}
}
cURL Example:
curl http://localhost:8000/queue
POST /queue/dequeue
Dequeue next job matching worker capabilities. Used by workers to poll for jobs.
Request Body:
{
"worker_id": "worker-1",
"capabilities": ["nbd_access", "lvm_tools", "mount_operations", "qemu_img"],
"execution_mode": "PRIVILEGED_CONTAINER"
}
Response (200 OK):
{
"job_id": "convert-vm-125",
"operation": "convert",
"image": {
"path": "/data/vm.vmdk",
"format": "vmdk"
},
"parameters": {
"output_format": "qcow2"
}
/* ... full JobSpec ... */
}
Response (200 OK - No Jobs):
null
cURL Example:
curl -X POST http://localhost:8000/queue/dequeue \
-H "Content-Type: application/json" \
-d @worker_capabilities.json
GET /health
Health check endpoint for load balancers and monitoring.
Response (200 OK):
{
"status": "healthy",
"version": "v1",
"timestamp": "2026-01-31T10:00:00Z",
"workers": 3,
"active_jobs": 5
}
cURL Example:
curl http://localhost:8000/health
GET /metrics
Prometheus metrics endpoint.
Response (200 OK):
# HELP hyper2kvm_migration_total Total number of migrations
# TYPE hyper2kvm_migration_total counter
hyper2kvm_migration_total{status="completed"} 125
hyper2kvm_migration_total{status="failed"} 3
# HELP hyper2kvm_migration_duration_seconds Migration duration
# TYPE hyper2kvm_migration_duration_seconds histogram
hyper2kvm_migration_duration_seconds_bucket{le="60.0"} 45
hyper2kvm_migration_duration_seconds_bucket{le="300.0"} 98
hyper2kvm_migration_duration_seconds_bucket{le="600.0"} 120
hyper2kvm_migration_duration_seconds_bucket{le="+Inf"} 128
hyper2kvm_migration_duration_seconds_sum 23456.78
hyper2kvm_migration_duration_seconds_count 128
# HELP hyper2kvm_worker_jobs_active Current number of active jobs
# TYPE hyper2kvm_worker_jobs_active gauge
hyper2kvm_worker_jobs_active 5
Prometheus Configuration:
scrape_configs:
- job_name: 'hyper2kvm'
static_configs:
- targets: ['localhost:8000']
metrics_path: /metrics
scrape_interval: 15s
cURL Example:
curl http://localhost:8000/metrics
All error responses follow this format:
{
"error": {
"code": 404,
"message": "Job convert-vm-999 not found",
"path": "/jobs/convert-vm-999",
"timestamp": "2026-01-31T10:00:00Z"
}
}
HTTP Status Codes:
200 - Success201 - Created400 - Bad Request (invalid input)404 - Not Found500 - Internal Server Error# Install dependencies
pip install -r requirements-api.txt
# Run with auto-reload
uvicorn hyper2kvm.worker.api:app --reload --host 0.0.0.0 --port 8000
# Using Gunicorn with multiple workers
gunicorn hyper2kvm.worker.api:app \
-w 4 \
-k uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--access-logfile - \
--error-logfile - \
--log-level info
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt requirements-api.txt ./
RUN pip install --no-cache-dir -r requirements.txt -r requirements-api.txt
COPY hyper2kvm/ ./hyper2kvm/
EXPOSE 8000
CMD ["uvicorn", "hyper2kvm.worker.api:app", "--host", "0.0.0.0", "--port", "8000"]
apiVersion: apps/v1
kind: Deployment
metadata:
name: hyper2kvm-api
spec:
replicas: 3
selector:
matchLabels:
app: hyper2kvm-api
template:
metadata:
labels:
app: hyper2kvm-api
spec:
containers:
- name: api
image: ghcr.io/ssahani/hyper2kvm-worker-api:latest
ports:
- containerPort: 8000
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: hyper2kvm-api
spec:
selector:
app: hyper2kvm-api
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
import httpx
import asyncio
async def submit_and_monitor():
async with httpx.AsyncClient() as client:
# Submit job
response = await client.post(
"http://localhost:8000/jobs",
json={
"job_id": "demo-123",
"operation": "convert",
"image": {"path": "/data/vm.vmdk", "format": "vmdk"},
"parameters": {"output_format": "qcow2"}
},
params={"queue": True}
)
job_id = response.json()["job_id"]
# Monitor progress (polling)
while True:
status = await client.get(f"http://localhost:8000/jobs/{job_id}")
state = status.json()["state"]
print(f"State: {state}")
if state in ["COMPLETED", "FAILED", "CANCELLED"]:
break
await asyncio.sleep(2)
asyncio.run(submit_and_monitor())
// Submit job
const response = await fetch('http://localhost:8000/jobs?queue=true', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({
job_id: 'demo-123',
operation: 'convert',
image: {path: '/data/vm.vmdk', format: 'vmdk'},
parameters: {output_format: 'qcow2'}
})
});
const {job_id} = await response.json();
// Monitor via SSE
const source = new EventSource(`http://localhost:8000/jobs/${job_id}/events/stream`);
source.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
console.log(`${data.percentage}%: ${data.message}`);
});
source.addEventListener('complete', (event) => {
console.log('Job complete!');
source.close();
});
# Submit and capture job ID
JOB_ID=$(curl -s -X POST http://localhost:8000/jobs?queue=true \
-H "Content-Type: application/json" \
-d '{"job_id":"demo-123","operation":"convert","image":{"path":"/data/vm.vmdk","format":"vmdk"},"parameters":{"output_format":"qcow2"}}' \
| jq -r '.job_id')
echo "Job ID: $JOB_ID"
# Monitor progress
while true; do
STATE=$(curl -s http://localhost:8000/jobs/$JOB_ID | jq -r '.state')
echo "State: $STATE"
[[ "$STATE" == "COMPLETED" || "$STATE" == "FAILED" ]] && break
sleep 2
done
from fastapi import Security, HTTPException
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
async def verify_api_key(api_key: str = Security(api_key_header)):
if api_key != "your-secret-key":
raise HTTPException(status_code=403, detail="Invalid API key")
# Apply to endpoints
@app.post("/jobs", dependencies=[Depends(verify_api_key)])
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
@app.post("/jobs")
@limiter.limit("10/minute")
async def submit_job(...):
...
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"], # Specific domains
allow_credentials=True,
allow_methods=["GET", "POST", "DELETE"],
allow_headers=["*"],
)
# Check if port is in use
lsof -i :8000
# Use different port
uvicorn hyper2kvm.worker.api:app --port 8001
# Check job registry
curl http://localhost:8000/jobs
# Check logs
uvicorn hyper2kvm.worker.api:app --log-level debug
# List workers
curl http://localhost:8000/workers
# Check worker heartbeats (stale after 300s)
# Test with curl
curl -N -H "Accept: text/event-stream" http://localhost:8000/jobs/demo-123/events/stream
# Check browser CORS settings
# Calculate optimal workers: (2 × CPU cores) + 1
NUM_WORKERS=$((2 * $(nproc) + 1))
gunicorn hyper2kvm.worker.api:app -w $NUM_WORKERS -k uvicorn.workers.UvicornWorker
For production, replace in-memory registries with:
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
@app.on_event("startup")
async def startup():
redis = aioredis.from_url("redis://localhost")
FastAPICache.init(RedisBackend(redis), prefix="api-cache")
@app.get("/workers")
@cache(expire=60) # Cache for 60 seconds
async def list_workers():
...
Last Updated: 2026-01-31 Maintainer: hyper2kvm team