hypersdk

Multi-Cloud Concurrent Export Implementation

Status: Completed Date: 2026-01-23

Overview

Concurrent export functionality with live progress tracking has been implemented for all major cloud providers:

What Was Implemented

1. ExportOptions with Progress Callbacks

Each provider now has an ExportOptions type with:

2. Atomic Progress Tracking

All providers use sync/atomic for thread-safe progress updates:

type ProgressCallback func(current, total int64, fileName string, fileIndex, totalFiles int)

3. Concurrent-Ready Export Functions

Each provider has new export functions:

4. Progress Reader Wrappers

All providers implement callbackProgressReader for real-time byte-level progress:

File Structure

New Files Created

providers/
├── aws/
│   └── export_options.go         # AWS ExportOptions type
├── azure/
│   └── export_options.go         # Azure ExportOptions type
├── gcp/
│   └── export_options.go         # GCP ExportOptions type
└── hyperv/
    └── export_options.go         # Hyper-V ExportOptions type

Modified Files

providers/
├── aws/
│   └── export.go                 # Added callback support
├── azure/
│   └── export.go                 # Added callback support
├── gcp/
│   └── export.go                 # Added callback support
├── hyperv/
│   └── client.go                 # Added callback support
└── vsphere/
    ├── export.go                 # Enhanced with callbacks
    └── export_options.go         # Already existed

Usage Examples

AWS - Concurrent EC2 Instance Exports

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"hypersdk/providers/aws"
)

func main() {
	// Create AWS client
	client, err := aws.NewClient(ctx, aws.Config{
		Region:   "us-east-1",
		S3Bucket: "my-exports",
	})
	if err != nil {
		panic(err)
	}

	// List of instances to export concurrently
	instances := []string{
		"i-1234567890abcdef0",
		"i-abcdef1234567890",
		"i-9876543210fedcba",
	}

	// Progress tracking map
	progress := make(map[string]*exportProgress)
	var mu sync.Mutex

	// Export all instances concurrently
	var wg sync.WaitGroup
	for _, instanceID := range instances {
		wg.Add(1)
		progress[instanceID] = &exportProgress{}

		go func(id string) {
			defer wg.Done()

			// Configure export options with callback
			opts := aws.ExportOptions{
				Format:          "vmdk",
				OutputPath:      "/exports/aws",
				S3Bucket:        "my-exports",
				S3Prefix:        "exports/",
				DownloadFromS3:  true,
				ExportTimeout:   2 * time.Hour,
				ShowProgress:    false, // We handle progress via callback
				ProgressCallback: func(current, total int64, fileName string, fileIndex, totalFiles int) {
					mu.Lock()
					progress[id].current = current
					progress[id].total = total
					progress[id].fileName = fileName
					mu.Unlock()
				},
			}

			// Start export
			result, err := client.ExportInstanceWithOptions(ctx, id, opts)
			if err != nil {
				fmt.Printf("Export failed for %s: %v\n", id, err)
				return
			}

			fmt.Printf("Exported %s to %s\n", id, result.LocalPath)
		}(instanceID)
	}

	// Monitor progress
	done := make(chan bool)
	go func() {
		ticker := time.NewTicker(500 * time.Millisecond)
		defer ticker.Stop()

		for {
			select {
			case <-done:
				return
			case <-ticker.C:
				mu.Lock()
				for id, p := range progress {
					if p.total > 0 {
						pct := float64(p.current) * 100 / float64(p.total)
						fmt.Printf("%s: %.1f%% (%s)\n", id, pct, p.fileName)
					}
				}
				mu.Unlock()
			}
		}
	}()

	// Wait for all exports
	wg.Wait()
	close(done)
	fmt.Println("All exports completed!")
}

type exportProgress struct {
	current  int64
	total    int64
	fileName string
}

Azure - Concurrent Disk Exports

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"hypersdk/providers/azure"
)

func main() {
	// Create Azure client
	client, err := azure.NewClient(ctx, azure.Config{
		SubscriptionID: "your-subscription-id",
		ResourceGroup:  "my-resource-group",
		TenantID:       "your-tenant-id",
	})
	if err != nil {
		panic(err)
	}

	// Disks to export
	disks := []string{"disk1", "disk2", "disk3"}

	var wg sync.WaitGroup
	for _, diskName := range disks {
		wg.Add(1)

		go func(disk string) {
			defer wg.Done()

			opts := azure.ExportOptions{
				Format:        "vhd",
				OutputPath:    "/exports/azure",
				DownloadLocal: true,
				RevokeAccess:  true,
				AccessDuration: 1 * time.Hour,
				ProgressCallback: func(current, total int64, fileName string, fileIndex, totalFiles int) {
					pct := float64(current) * 100 / float64(total)
					fmt.Printf("[%s] %.1f%% - %s\n", disk, pct, fileName)
				},
			}

			result, err := client.ExportDiskWithOptions(ctx, disk, opts)
			if err != nil {
				fmt.Printf("Failed to export %s: %v\n", disk, err)
				return
			}

			fmt.Printf("Exported %s to %s (%d bytes)\n", disk, result.LocalPath, result.Size)
		}(diskName)
	}

	wg.Wait()
	fmt.Println("All Azure disks exported!")
}

GCP - Concurrent Disk Exports

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"hypersdk/providers/gcp"
)

func main() {
	client, err := gcp.NewClient(ctx, gcp.Config{
		ProjectID:       "my-project",
		Zone:            "us-central1-a",
		CredentialsJSON: "/path/to/credentials.json",
	})
	if err != nil {
		panic(err)
	}

	disks := []string{"disk-1", "disk-2", "disk-3"}

	var wg sync.WaitGroup
	for _, diskName := range disks {
		wg.Add(1)

		go func(disk string) {
			defer wg.Done()

			opts := gcp.ExportOptions{
				Format:          "vmdk",
				OutputPath:      "/exports/gcp",
				GCSBucket:       "my-exports",
				GCSPrefix:       "exports/",
				DownloadFromGCS: true,
				CreateImage:     true,
				ImageTimeout:    30 * time.Minute,
				ProgressCallback: func(current, total int64, fileName string, fileIndex, totalFiles int) {
					pct := float64(current) * 100 / float64(total)
					fmt.Printf("[GCP-%s] %.1f%%\n", disk, pct)
				},
			}

			result, err := client.ExportDiskWithOptions(ctx, disk, opts)
			if err != nil {
				fmt.Printf("Failed: %v\n", err)
				return
			}

			fmt.Printf("Exported %s\n", result.DiskName)
		}(diskName)
	}

	wg.Wait()
}

Hyper-V - Concurrent VM Exports

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"hypersdk/providers/hyperv"
)

func main() {
	client, err := hyperv.NewClient(hyperv.Config{
		Host:    "",  // Empty for local
		Timeout: 10 * time.Minute,
	})
	if err != nil {
		panic(err)
	}

	vms := []string{"VM-Web-01", "VM-DB-01", "VM-App-01"}

	var wg sync.WaitGroup
	for _, vmName := range vms {
		wg.Add(1)

		go func(vm string) {
			defer wg.Done()

			opts := hyperv.ExportOptions{
				Format:           "vhdx",
				OutputPath:       "C:\\Exports",
				ExportType:       "vhd-only",
				IncludeSnapshots: false,
				ExportTimeout:    2 * time.Hour,
				ProgressCallback: func(current, total int64, fileName string, fileIndex, totalFiles int) {
					pct := float64(current) * 100 / float64(total)
					speed := float64(current) / 1024 / 1024 // MB
					fmt.Printf("[%s] File %d/%d: %.1f%% (%.1f MB)\n",
						vm, fileIndex, totalFiles, pct, speed)
				},
			}

			err := client.ExportVMWithOptions(ctx, vm, opts)
			if err != nil {
				fmt.Printf("Failed to export %s: %v\n", vm, err)
				return
			}

			fmt.Printf("Exported %s successfully\n", vm)
		}(vmName)
	}

	wg.Wait()
	fmt.Println("All Hyper-V VMs exported!")
}

Integration Patterns

1. Progress Aggregation

type MultiCloudProgress struct {
	mu       sync.Mutex
	exports  map[string]*ExportState
}

type ExportState struct {
	Provider     string
	ResourceID   string
	CurrentBytes int64
	TotalBytes   int64
	FileName     string
	Status       string // "starting", "downloading", "completed", "failed"
	Speed        float64
	StartTime    time.Time
	EndTime      time.Time
}

func (mp *MultiCloudProgress) CreateCallback(provider, resourceID string) func(int64, int64, string, int, int) {
	return func(current, total int64, fileName string, fileIndex, totalFiles int) {
		mp.mu.Lock()
		defer mp.mu.Unlock()

		state := mp.exports[resourceID]
		state.CurrentBytes = current
		state.TotalBytes = total
		state.FileName = fileName
		state.Status = "downloading"

		// Calculate speed
		elapsed := time.Since(state.StartTime).Seconds()
		if elapsed > 0 {
			state.Speed = float64(current) / elapsed / 1024 / 1024 // MB/s
		}
	}
}

2. TUI Integration (Planned)

To integrate with the existing TUI (Enhancement #23), the following changes would be needed:

Phase 1: Provider Abstraction

// Unified VM interface across providers
type UnifiedVM interface {
	GetName() string
	GetProvider() string
	GetID() string
	ExportWithCallback(ctx context.Context, callback ProgressCallback) error
}

// Implement for each provider
type VSphereVM struct { /* ... */ }
type AWSEC2Instance struct { /* ... */ }
type AzureVM struct { /* ... */ }
// etc.

Phase 2: TUI Model Updates

type tuiModel struct {
	// ... existing fields ...

	provider       string // "vsphere", "aws", "azure", "gcp", "hyperv"
	vms            []UnifiedVM // Provider-agnostic VMs
	activeExports  map[string]*activeExportState
	showExportPane bool
}

Phase 3: Provider-Specific Export Logic

func (m tuiModel) startSingleExport(vm UnifiedVM) tea.Cmd {
	return func() tea.Msg {
		ctx, cancel := context.WithCancel(context.Background())

		// Create progress callback
		progressChan := make(chan exportProgressMsg, 100)
		doneChan := make(chan exportDoneMsg, 1)

		callback := func(current, total int64, fileName string, fileIndex, totalFiles int) {
			select {
			case progressChan <- exportProgressMsg{
				vmName:         vm.GetName(),
				currentBytes:   current,
				totalBytes:     total,
				fileName:       fileName,
				currentFileIdx: fileIndex,
				totalFiles:     totalFiles,
			}:
			default:
			}
		}

		// Export using provider-specific method
		go func() {
			err := vm.ExportWithCallback(ctx, callback)
			doneChan <- exportDoneMsg{
				vmName: vm.GetName(),
				err:    err,
			}
		}()

		return exportStartMsg{
			cancelFunc: cancel,
			exportCmd:  waitForExport(progressChan, doneChan),
		}
	}
}

Benefits

1. Concurrent Operations

2. Real-Time Progress

3. Thread Safety

4. Provider Flexibility

5. Error Handling


Technical Implementation Details

Progress Callback Flow

Download/Copy Start
        ↓
Create callbackProgressReader
        ↓
Wrap io.Reader
        ↓
io.Copy() loop
        ↓
callbackProgressReader.Read()
        ↓
atomic.AddInt64(&currentBytes, n)
        ↓
callback(current, total, fileName, fileIndex, totalFiles)
        ↓
UI Update (external)

Atomic Operations

All providers use sync/atomic for thread-safe progress:

// In Read() method of callbackProgressReader
n, err := cpr.reader.Read(p)
current := atomic.AddInt64(cpr.currentBytes, int64(n))
cpr.callback(current, cpr.total, cpr.fileName, cpr.fileIndex, cpr.totalFiles)

This ensures:

Memory Efficiency

Progress readers use minimal overhead:


Performance Considerations

Concurrent Exports:

Progress Update Frequency:

Resource Usage

Per concurrent export:


Testing

Unit Tests Needed

// Test progress callback invocation
func TestAWSExportWithCallback(t *testing.T) {
	var callbackCount int
	var lastProgress int64

	opts := aws.ExportOptions{
		ProgressCallback: func(current, total int64, fileName string, fileIndex, totalFiles int) {
			callbackCount++
			lastProgress = current
		},
	}

	// Mock export and verify callback was called
}

Integration Tests

# Test concurrent exports
go test -run TestConcurrentExport ./providers/...

# Test progress tracking
go test -run TestProgressCallback ./providers/...

# Test cancellation
go test -run TestExportCancellation ./providers/...

Future Enhancements

1. TUI Multi-Provider Support (Not Yet Implemented)

2. Export Queue Prioritization

3. Export History Tracking

4. Batch Export Profiles


Conclusion

Concurrent export functionality with live progress tracking is now implemented for all major cloud providers:

✅ AWS, Azure, GCP, Hyper-V, vSphere

Each provider has:

Status: Production-ready for programmatic use TUI Integration: Requires provider abstraction layer (planned)


For vSphere TUI usage, see: TUI_USER_GUIDE.md (Enhancement #23) For provider API usage, see: Examples above