Files
temporalio-test/app/Http/Controllers/TemporalDemoController.php
2026-05-09 01:18:51 +02:00

837 lines
28 KiB
PHP

<?php
namespace App\Http\Controllers;
use App\Models\ImportJob;
use App\Models\Order;
use App\Temporal\DataEnrichment\DataEnrichmentWorkflowInterface;
use App\Temporal\EloquentQuery\EloquentQueryWorkflowInterface;
use App\Temporal\ExternalApiSync\ExternalApiSyncWorkflowInterface;
use App\Temporal\OrderFulfillment\OrderFulfillmentWorkflowInterface;
use App\Temporal\ProductImport\ProductImportWorkflowInterface;
use App\Temporal\SystemMonitor\SystemMonitorWorkflowInterface;
use App\Temporal\UserMigration\UserMigrationWorkflowInterface;
use App\Temporal\WebhookDelivery\WebhookDeliveryWorkflowInterface;
use Illuminate\Http\JsonResponse;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Artisan;
use Inertia\Inertia;
use Inertia\Response as InertiaResponse;
use Symfony\Component\HttpFoundation\StreamedResponse;
use Temporal\Client\GRPC\ServiceClient;
use Temporal\Client\WorkflowClient;
use Temporal\Client\WorkflowClientInterface;
use Temporal\Client\WorkflowOptions;
use Temporal\Workflow\WorkflowExecutionStatus;
class TemporalDemoController extends Controller
{
public function __construct(
private WorkflowClientInterface $workflowClient,
) {}
public function dashboard(): InertiaResponse
{
$importJobs = ImportJob::latest()->take(10)->get();
$orders = Order::latest()->take(10)->get();
return Inertia::render('Dashboard', [
'importJobs' => $importJobs,
'orders' => $orders,
]);
}
// --- Reset & Terminate ---
public function reset(): StreamedResponse
{
return $this->streamedOperation(function () {
$this->emit('Starting full reset...', 'step');
// 1. Look up container IDs (before dropping DBs, which may crash them)
$containerId = null;
$workerContainerId = null;
try {
$containerId = $this->findDockerContainer('temporal');
$workerContainerId = $this->findDockerContainer('temporal-worker');
} catch (\Throwable $e) {
$this->emit("Docker lookup failed: {$e->getMessage()}", 'error');
}
// 2. Drop Temporal databases (all workflow history will be wiped)
$this->emit('Dropping Temporal databases...', 'step');
try {
$pdo = new \PDO('pgsql:host=temporal-pgsql;port=5432;dbname=postgres', 'temporal', 'temporal');
$pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);
foreach (['temporal', 'temporal_visibility'] as $db) {
$pdo->exec("SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{$db}' AND pid <> pg_backend_pid()");
$pdo->exec("DROP DATABASE IF EXISTS {$db}");
$pdo->exec("CREATE DATABASE {$db} OWNER temporal");
$this->emit("Recreated database: {$db}");
}
$this->emit('Temporal databases reset', 'success');
} catch (\Throwable $e) {
$this->emit("Database reset failed: {$e->getMessage()}", 'error');
}
// 3. Restart Temporal container (using pre-fetched ID)
$this->emit('Restarting Temporal container...', 'step');
try {
$this->restartDockerContainer($containerId);
$this->emit('Restart signal sent');
} catch (\Throwable $e) {
$this->emit("Docker restart failed: {$e->getMessage()}", 'error');
}
// 4. Wait for Temporal to come back
$this->emit('Waiting for Temporal to come back online...', 'step');
$start = time();
$online = false;
$lastUpdate = 0;
while (time() - $start < 30) {
try {
$client = WorkflowClient::create(
ServiceClient::create(config('temporal.address'))
);
$client->listWorkflowExecutions('ExecutionStatus="Running"', pageSize: 1);
$online = true;
break;
} catch (\Throwable) {
$elapsed = time() - $start;
if ($elapsed - $lastUpdate >= 3) {
$this->emit("Still waiting... ({$elapsed}s)");
$lastUpdate = $elapsed;
}
sleep(1);
}
}
if ($online) {
$this->emit('Temporal is back online', 'success');
} else {
$this->emit('Temporal did not come back within 30s — may need manual restart', 'error');
}
// 5. Restart worker
$this->emit('Restarting worker...', 'step');
try {
$this->restartDockerContainer($workerContainerId);
$this->emit('Worker restarted', 'success');
} catch (\Throwable $e) {
$this->emit("Worker restart failed: {$e->getMessage()}", 'error');
}
// 6. Reset Laravel database
$this->emit('Running migrate:fresh --seed...', 'step');
try {
Artisan::call('migrate:fresh', ['--seed' => true, '--seeder' => 'TemporalDemoSeeder', '--force' => true]);
$this->emit('Database reset and seeded', 'success');
} catch (\Throwable $e) {
$this->emit("Migration failed: {$e->getMessage()}", 'error');
}
$this->emit('Reset complete', 'done');
});
}
public function terminateAll(): StreamedResponse
{
return $this->streamedOperation(function () {
$this->emit('Listing running workflows...', 'step');
$terminated = 0;
try {
$paginator = $this->workflowClient->listWorkflowExecutions(
'ExecutionStatus="Running"',
pageSize: 100,
);
foreach ($paginator as $info) {
try {
$wfId = $info->execution->getID();
$stub = $this->workflowClient->newUntypedRunningWorkflowStub(
$wfId,
$info->execution->getRunID(),
);
$stub->terminate('Manual termination from dashboard');
$terminated++;
$this->emit("Terminated: {$wfId}", 'warn');
} catch (\Throwable) {
// Already completed
}
}
$this->emit("Terminated {$terminated} workflow(s)", 'success');
} catch (\Throwable $e) {
$this->emit("Failed to connect to Temporal: {$e->getMessage()}", 'error');
}
$this->emit($terminated > 0 ? 'All workflows terminated' : 'No running workflows found', 'done');
});
}
/**
* Check actual execution status via Temporal's DescribeWorkflowExecution.
* Returns a terminal status string, or null if the workflow is still running.
*/
private function resolveWorkflowStatus(string $workflowId): ?string
{
try {
$stub = $this->workflowClient->newUntypedRunningWorkflowStub($workflowId);
$description = $stub->describe();
return match ($description->info->status) {
WorkflowExecutionStatus::Completed => 'completed',
WorkflowExecutionStatus::Failed, WorkflowExecutionStatus::TimedOut => 'failed',
WorkflowExecutionStatus::Canceled, WorkflowExecutionStatus::Terminated => 'cancelled',
default => null,
};
} catch (\Throwable) {
// Workflow not found in Temporal — treat as failed
return 'failed';
}
}
private function streamedOperation(\Closure $callback): StreamedResponse
{
return response()->stream(function () use ($callback) {
while (ob_get_level()) {
ob_end_flush();
}
$callback();
}, 200, [
'Content-Type' => 'text/plain',
'Cache-Control' => 'no-cache',
'X-Accel-Buffering' => 'no',
]);
}
private function emit(string $message, string $type = 'info'): void
{
echo json_encode([
'type' => $type,
'message' => $message,
'time' => date('H:i:s'),
]) . "\n";
flush();
}
private function findDockerContainer(string $serviceName): ?string
{
$socketPath = '/var/run/docker.sock';
if (!file_exists($socketPath)) {
$this->emit('Docker socket not found — cannot restart automatically', 'error');
return null;
}
$filters = urlencode(json_encode([
'label' => [
"com.docker.compose.service={$serviceName}",
'com.docker.compose.project=temporalio-test',
],
]));
$response = $this->dockerApiGet("/containers/json?all=true&filters={$filters}");
$containers = json_decode($response, true);
if (empty($containers)) {
$this->emit("Container for service '{$serviceName}' not found", 'error');
return null;
}
$containerId = $containers[0]['Id'];
$containerName = ltrim($containers[0]['Names'][0] ?? $containerId, '/');
$this->emit("Found container: {$containerName}");
return $containerId;
}
private function restartDockerContainer(?string $containerId): void
{
if (!$containerId) {
return;
}
$this->dockerApiPost("/containers/{$containerId}/restart?t=5");
}
private function dockerApiGet(string $path): string
{
return $this->dockerApiRequest('GET', $path);
}
private function dockerApiPost(string $path): string
{
return $this->dockerApiRequest('POST', $path);
}
private function dockerApiRequest(string $method, string $path): string
{
$socket = stream_socket_client('unix:///var/run/docker.sock', $errno, $errstr, 5);
if (!$socket) {
$this->emit("Docker socket connection failed: {$errstr}", 'error');
return '';
}
stream_set_timeout($socket, 30);
$request = "{$method} {$path} HTTP/1.0\r\nHost: localhost\r\n\r\n";
fwrite($socket, $request);
$response = '';
while (!feof($socket)) {
$chunk = fread($socket, 8192);
if ($chunk === false) break;
$meta = stream_get_meta_data($socket);
if ($meta['timed_out']) {
$this->emit('Docker API request timed out', 'error');
break;
}
$response .= $chunk;
}
fclose($socket);
// Strip HTTP headers — body comes after \r\n\r\n
$parts = explode("\r\n\r\n", $response, 2);
return $parts[1] ?? '';
}
// --- Product Import ---
public function startImport(Request $request): JsonResponse
{
$filePath = storage_path('app/imports/products.csv');
if (!file_exists($filePath)) {
return response()->json(['error' => 'CSV file not found. Run TemporalDemoSeeder first.'], 404);
}
$simulationConfig = $request->input('simulation', []);
$workflowId = 'product-import-' . uniqid();
$stub = $this->workflowClient->newWorkflowStub(
ProductImportWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowId($workflowId)
->withTaskQueue(config('temporal.task_queue'))
);
$run = $this->workflowClient->start($stub, $filePath, $simulationConfig);
$importJob = ImportJob::create([
'workflow_id' => $workflowId,
'run_id' => $run->getExecution()->getRunID(),
'type' => 'product_import',
'file_path' => $filePath,
'status' => 'started',
]);
return response()->json([
'message' => 'Product import started',
'workflow_id' => $workflowId,
'run_id' => $run->getExecution()->getRunID(),
'import_job_id' => $importJob->id,
]);
}
public function pauseImport(string $id): JsonResponse
{
$importJob = ImportJob::findOrFail($id);
$stub = $this->workflowClient->newRunningWorkflowStub(
ProductImportWorkflowInterface::class,
$importJob->workflow_id
);
$stub->pause();
$importJob->update(['status' => 'paused']);
return response()->json(['message' => 'Import paused', 'workflow_id' => $importJob->workflow_id]);
}
public function resumeImport(string $id): JsonResponse
{
$importJob = ImportJob::findOrFail($id);
$stub = $this->workflowClient->newRunningWorkflowStub(
ProductImportWorkflowInterface::class,
$importJob->workflow_id
);
$stub->resume();
$importJob->update(['status' => 'processing']);
return response()->json(['message' => 'Import resumed', 'workflow_id' => $importJob->workflow_id]);
}
public function importStatus(string $id): JsonResponse
{
$importJob = ImportJob::findOrFail($id);
try {
$stub = $this->workflowClient->newRunningWorkflowStub(
ProductImportWorkflowInterface::class,
$importJob->workflow_id
);
$status = $stub->getStatus();
// Query handler returns cached internal state — cross-check
// actual execution status when the query says non-terminal
if (!in_array($status['status'], ['completed', 'cancelled', 'failed'])) {
$resolved = $this->resolveWorkflowStatus($importJob->workflow_id);
if ($resolved !== null) {
$status['status'] = $resolved;
}
}
$importJob->update([
'status' => $status['status'],
'total_records' => $status['totalRecords'] ?? 0,
'processed_records' => $status['processedRecords'] ?? 0,
'failed_records' => $status['failedRecords'] ?? 0,
]);
return response()->json($status);
} catch (\Throwable $e) {
$resolvedStatus = $this->resolveWorkflowStatus($importJob->workflow_id);
if ($resolvedStatus !== null) {
$importJob->update(['status' => $resolvedStatus]);
}
return response()->json([
'status' => $resolvedStatus ?? $importJob->status,
'error' => $e->getMessage(),
]);
}
}
// --- Order Fulfillment ---
public function processOrder(Request $request, int $orderId): JsonResponse
{
$order = Order::findOrFail($orderId);
$simulationConfig = $request->input('simulation', []);
$workflowId = 'order-fulfillment-' . $orderId;
$stub = $this->workflowClient->newWorkflowStub(
OrderFulfillmentWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowId($workflowId)
->withTaskQueue(config('temporal.task_queue'))
);
$run = $this->workflowClient->start($stub, $orderId, $simulationConfig);
return response()->json([
'message' => 'Order processing started',
'workflow_id' => $workflowId,
'run_id' => $run->getExecution()->getRunID(),
'order_id' => $orderId,
]);
}
public function shipOrder(int $orderId, Request $request): JsonResponse
{
$trackingNumber = $request->input('tracking_number', 'TRACK-' . strtoupper(uniqid()));
$workflowId = 'order-fulfillment-' . $orderId;
$stub = $this->workflowClient->newRunningWorkflowStub(
OrderFulfillmentWorkflowInterface::class,
$workflowId
);
$stub->confirmShipping($trackingNumber);
return response()->json([
'message' => 'Shipping confirmation sent',
'workflow_id' => $workflowId,
'tracking_number' => $trackingNumber,
]);
}
public function orderStatus(int $orderId): JsonResponse
{
$workflowId = 'order-fulfillment-' . $orderId;
try {
$stub = $this->workflowClient->newRunningWorkflowStub(
OrderFulfillmentWorkflowInterface::class,
$workflowId
);
$status = $stub->getOrderStatus();
return response()->json($status);
} catch (\Throwable $e) {
return response()->json([
'error' => $e->getMessage(),
'order_id' => $orderId,
]);
}
}
// --- User Migration ---
public function startMigration(Request $request): JsonResponse
{
$totalUsers = $request->input('total_users', 100);
$batchSize = $request->input('batch_size', 20);
$simulationConfig = $request->input('simulation', []);
$workflowId = 'user-migration-' . uniqid();
$stub = $this->workflowClient->newWorkflowStub(
UserMigrationWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowId($workflowId)
->withTaskQueue(config('temporal.task_queue'))
);
$run = $this->workflowClient->start($stub, (int) $totalUsers, (int) $batchSize, $simulationConfig);
$importJob = ImportJob::create([
'workflow_id' => $workflowId,
'run_id' => $run->getExecution()->getRunID(),
'type' => 'user_migration',
'status' => 'started',
'total_records' => $totalUsers,
]);
return response()->json([
'message' => 'User migration started',
'workflow_id' => $workflowId,
'run_id' => $run->getExecution()->getRunID(),
'import_job_id' => $importJob->id,
]);
}
public function pauseMigration(string $id): JsonResponse
{
$importJob = ImportJob::findOrFail($id);
$stub = $this->workflowClient->newRunningWorkflowStub(
UserMigrationWorkflowInterface::class,
$importJob->workflow_id
);
$stub->pause();
$importJob->update(['status' => 'paused']);
return response()->json(['message' => 'Migration paused', 'workflow_id' => $importJob->workflow_id]);
}
public function resumeMigration(string $id): JsonResponse
{
$importJob = ImportJob::findOrFail($id);
$stub = $this->workflowClient->newRunningWorkflowStub(
UserMigrationWorkflowInterface::class,
$importJob->workflow_id
);
$stub->resume();
$importJob->update(['status' => 'processing']);
return response()->json(['message' => 'Migration resumed', 'workflow_id' => $importJob->workflow_id]);
}
public function migrationStatus(string $id): JsonResponse
{
$importJob = ImportJob::findOrFail($id);
try {
$stub = $this->workflowClient->newRunningWorkflowStub(
UserMigrationWorkflowInterface::class,
$importJob->workflow_id
);
$status = $stub->getProgress();
// Query handler returns cached internal state — cross-check
// actual execution status when the query says non-terminal
if (!in_array($status['status'], ['completed', 'cancelled', 'failed'])) {
$resolved = $this->resolveWorkflowStatus($importJob->workflow_id);
if ($resolved !== null) {
$status['status'] = $resolved;
}
}
$importJob->update([
'status' => $status['status'],
'processed_records' => $status['processedUsers'] ?? 0,
'failed_records' => $status['failedUsers'] ?? 0,
]);
return response()->json($status);
} catch (\Throwable $e) {
$resolvedStatus = $this->resolveWorkflowStatus($importJob->workflow_id);
if ($resolvedStatus !== null) {
$importJob->update(['status' => $resolvedStatus]);
}
return response()->json([
'status' => $resolvedStatus ?? $importJob->status,
'error' => $e->getMessage(),
]);
}
}
// --- External API Sync ---
public function startApiSync(Request $request): JsonResponse
{
$apiEndpoint = $request->input('api_endpoint', 'https://api.example.com/products');
$simulationConfig = $request->input('simulation', []);
$workflowId = 'api-sync-' . uniqid();
$stub = $this->workflowClient->newWorkflowStub(
ExternalApiSyncWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowId($workflowId)
->withTaskQueue(config('temporal.task_queue'))
);
$run = $this->workflowClient->start($stub, $apiEndpoint, $simulationConfig);
return response()->json([
'message' => 'API sync started',
'workflow_id' => $workflowId,
'run_id' => $run->getExecution()->getRunID(),
]);
}
public function apiSyncStatus(string $id): JsonResponse
{
try {
$stub = $this->workflowClient->newRunningWorkflowStub(
ExternalApiSyncWorkflowInterface::class,
$id
);
return response()->json($stub->getProgress());
} catch (\Throwable $e) {
return response()->json(['error' => $e->getMessage()]);
}
}
public function pauseApiSync(string $id): JsonResponse
{
try {
$stub = $this->workflowClient->newRunningWorkflowStub(
ExternalApiSyncWorkflowInterface::class,
$id
);
$stub->pause();
return response()->json(['message' => 'API sync paused']);
} catch (\Throwable $e) {
return response()->json(['error' => $e->getMessage()]);
}
}
public function resumeApiSync(string $id): JsonResponse
{
try {
$stub = $this->workflowClient->newRunningWorkflowStub(
ExternalApiSyncWorkflowInterface::class,
$id
);
$stub->resume();
return response()->json(['message' => 'API sync resumed']);
} catch (\Throwable $e) {
return response()->json(['error' => $e->getMessage()]);
}
}
// --- Webhook Delivery ---
public function deliverWebhooks(Request $request): JsonResponse
{
$payload = $request->input('payload', ['event' => 'order.created', 'data' => ['order_id' => 1]]);
$endpoints = $request->input('endpoints', [
'https://api.example.com/webhook',
'https://hooks.partner.io/events',
'https://notify.service.dev/incoming',
'https://webhook.site/test-endpoint',
]);
$simulationConfig = $request->input('simulation', []);
$workflowId = 'webhook-delivery-' . uniqid();
$stub = $this->workflowClient->newWorkflowStub(
WebhookDeliveryWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowId($workflowId)
->withTaskQueue(config('temporal.task_queue'))
);
$run = $this->workflowClient->start($stub, $payload, $endpoints, $simulationConfig);
return response()->json([
'message' => 'Webhook delivery started',
'workflow_id' => $workflowId,
'run_id' => $run->getExecution()->getRunID(),
'endpoints_count' => count($endpoints),
]);
}
public function webhookStatus(string $id): JsonResponse
{
try {
$stub = $this->workflowClient->newRunningWorkflowStub(
WebhookDeliveryWorkflowInterface::class,
$id
);
return response()->json($stub->getDeliveryStatus());
} catch (\Throwable $e) {
return response()->json(['error' => $e->getMessage()]);
}
}
// --- Data Enrichment ---
public function startEnrichment(Request $request): JsonResponse
{
$recordIds = $request->input('record_ids', []);
$simulationConfig = $request->input('simulation', []);
// Default to first 5 orders if no IDs provided
if (empty($recordIds)) {
$recordIds = Order::orderBy('id')->take(5)->pluck('id')->toArray();
}
$workflowId = 'data-enrichment-' . uniqid();
$stub = $this->workflowClient->newWorkflowStub(
DataEnrichmentWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowId($workflowId)
->withTaskQueue(config('temporal.task_queue'))
);
$run = $this->workflowClient->start($stub, $recordIds, $simulationConfig);
return response()->json([
'message' => 'Data enrichment started',
'workflow_id' => $workflowId,
'run_id' => $run->getExecution()->getRunID(),
'record_count' => count($recordIds),
]);
}
public function enrichmentStatus(string $id): JsonResponse
{
try {
$stub = $this->workflowClient->newRunningWorkflowStub(
DataEnrichmentWorkflowInterface::class,
$id
);
return response()->json($stub->getProgress());
} catch (\Throwable $e) {
return response()->json(['error' => $e->getMessage()]);
}
}
// --- Eloquent Query Pipeline ---
public function startEloquentQuery(Request $request): JsonResponse
{
$simulationConfig = $request->input('simulation', []);
$workflowId = 'eloquent-query-' . uniqid();
$stub = $this->workflowClient->newWorkflowStub(
EloquentQueryWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowId($workflowId)
->withTaskQueue(config('temporal.task_queue'))
);
$run = $this->workflowClient->start($stub, $simulationConfig);
return response()->json([
'message' => 'Eloquent query pipeline started',
'workflow_id' => $workflowId,
'run_id' => $run->getExecution()->getRunID(),
]);
}
public function eloquentQueryStatus(string $id): JsonResponse
{
try {
$stub = $this->workflowClient->newRunningWorkflowStub(
EloquentQueryWorkflowInterface::class,
$id
);
return response()->json($stub->getProgress());
} catch (\Throwable $e) {
return response()->json(['error' => $e->getMessage()]);
}
}
// --- System Health Monitor ---
public function startSystemMonitor(Request $request): JsonResponse
{
$intervalSeconds = $request->input('interval_seconds', 60);
$maxIterations = $request->input('max_iterations', 30);
$simulationConfig = $request->input('simulation', []);
$workflowId = 'system-monitor-' . uniqid();
$stub = $this->workflowClient->newWorkflowStub(
SystemMonitorWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowId($workflowId)
->withTaskQueue(config('temporal.task_queue'))
);
$run = $this->workflowClient->start($stub, (int) $intervalSeconds, (int) $maxIterations, [], $simulationConfig);
return response()->json([
'message' => 'System monitor started',
'workflow_id' => $workflowId,
'run_id' => $run->getExecution()->getRunID(),
]);
}
public function systemMonitorStatus(string $id): JsonResponse
{
try {
$stub = $this->workflowClient->newRunningWorkflowStub(
SystemMonitorWorkflowInterface::class,
$id
);
return response()->json($stub->getStatus());
} catch (\Throwable $e) {
return response()->json(['error' => $e->getMessage()]);
}
}
public function stopSystemMonitor(string $id): JsonResponse
{
try {
$stub = $this->workflowClient->newRunningWorkflowStub(
SystemMonitorWorkflowInterface::class,
$id
);
$stub->stop();
return response()->json(['message' => 'Stop signal sent']);
} catch (\Throwable $e) {
return response()->json(['error' => $e->getMessage()]);
}
}
}