openfoodfacts-api/app/Console/Commands/ImportOpenFoodFactsData.php

431 lines
15 KiB
PHP

<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Storage;
use Illuminate\Support\Facades\Cache;
use App\Models\Product;
use App\Models\Ingredient;
use App\Models\NutritionFact;
use Carbon\Carbon;
class ImportOpenFoodFactsData extends Command
{
protected $signature = 'import:open-food-facts {--chunk=1000} {--force}';
protected $description = 'Import data from Open Food Facts (limited to once per day)';
private $fileName = 'open_food_facts_latest.csv';
private $cacheKey = 'last_open_food_facts_import';
private $url = 'https://openfoodfacts-ds.s3.eu-west-3.amazonaws.com/en.openfoodfacts.org.products.csv';
private $header = null;
private $buffer = '';
private $rowCount = 0;
private $processedCount = 0;
private $errorCount = 0;
public function handle()
{
$lastDownload = Cache::get($this->cacheKey);
$fileExists = Storage::exists($this->fileName);
if ($lastDownload === now()->toDateString() && $fileExists && !$this->option('force')) {
if ($this->confirm('A file from today already exists. Do you want to reimport the data?')) {
return $this->processFile($this->option('chunk'));
}
$this->info('Import cancelled.');
return 0;
}
if (!$this->downloadLargeFile()) {
return 1;
}
Cache::put($this->cacheKey, now()->toDateString(), Carbon::tomorrow());
$this->info("File downloaded and saved as {$this->fileName}");
return $this->processFile($this->option('chunk'));
}
private function downloadLargeFile()
{
$this->info("Downloading data from Open Food Facts...");
$tempFile = Storage::path($this->fileName . '.tmp');
$finalFile = Storage::path($this->fileName);
$fileHandle = fopen($tempFile, 'w');
$curl = curl_init();
curl_setopt_array($curl, [
CURLOPT_URL => $this->url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_FOLLOWLOCATION => true,
CURLOPT_FILE => $fileHandle,
CURLOPT_TIMEOUT => 0,
CURLOPT_BUFFERSIZE => 128 * 1024, // 128 KB
CURLOPT_NOPROGRESS => false,
CURLOPT_PROGRESSFUNCTION => function($downloadSize, $downloaded, $uploadSize, $uploaded) {
if ($downloadSize > 0) {
$progress = round(($downloaded / $downloadSize) * 100, 2);
$this->output->write("\rDownload progress: $progress%");
}
},
]);
$success = curl_exec($curl);
$error = curl_error($curl);
curl_close($curl);
fclose($fileHandle);
if (!$success) {
$this->error("Failed to download the file: " . $error);
return false;
}
rename($tempFile, $finalFile);
$this->info("\nDownload completed successfully.");
return true;
}
private function processFile($chunkSize)
{
$this->info("Processing data...");
$filePath = Storage::path($this->fileName);
$fileSize = filesize($filePath);
$bar = $this->output->createProgressBar($fileSize);
$this->file_get_contents_chunked($filePath, 1024 * 1024, function($chunk, $handle, $iteration) use ($chunkSize, $bar) {
$this->buffer .= $chunk;
$lines = explode("\n", $this->buffer);
// If this is not the last chunk, keep the last (potentially incomplete) line in the buffer
if (!feof($handle)) {
$this->buffer = array_pop($lines);
} else {
$this->buffer = '';
}
foreach ($lines as $line) {
if (empty(trim($line))) continue;
if ($this->header === null) {
$this->header = str_getcsv($line, "\t");
} else {
$this->processRow(str_getcsv($line, "\t"));
}
$this->rowCount++;
if ($this->rowCount % $chunkSize === 0) {
$this->processChunk();
}
}
$bar->advance(strlen($chunk));
});
// Process any remaining data
if (!empty($this->buffer)) {
$this->processRow(str_getcsv($this->buffer, "\t"));
$this->processChunk();
}
$bar->finish();
$this->newLine(2);
$this->info("Import completed. Total rows processed: {$this->processedCount}");
$this->info("Total errors encountered: {$this->errorCount}");
return 0;
}
private function file_get_contents_chunked($file, $chunk_size, $callback)
{
try {
$handle = fopen($file, "r");
$i = 0;
while (!feof($handle)) {
call_user_func_array($callback, array(fread($handle, $chunk_size), &$handle, $i));
$i++;
}
fclose($handle);
} catch(Exception $e) {
$this->error("Error reading file: " . $e->getMessage());
return false;
}
return true;
}
private $currentChunk = [];
private function processRow($rowData)
{
try {
$row = array_combine($this->header, $rowData);
$this->currentChunk[] = $row;
$this->processedCount++;
} catch (\Exception $e) {
$this->error("Error processing row {$this->rowCount}: " . $e->getMessage());
$this->errorCount++;
}
}
private function processChunk()
{
$productsData = [];
$ingredientsData = [];
$nutritionFactsData = [];
foreach ($this->currentChunk as $row) {
try {
$productData = $this->extractProductData($row);
$productsData[$productData['code']] = $productData;
$ingredients = $this->extractIngredientsData($row, $productData['code']);
$ingredientsData = array_merge($ingredientsData, $ingredients);
$nutritionFacts = $this->extractNutritionFactsData($row, $productData['code']);
$nutritionFactsData = array_merge($nutritionFactsData, $nutritionFacts);
} catch (\Exception $e) {
$this->error("Error processing product {$row['code']}: " . $e->getMessage());
$this->errorCount++;
}
}
// First transaction: Insert or update products
DB::transaction(function () use ($productsData) {
if (!empty($productsData)) {
$this->bulkUpsert('products', array_keys(reset($productsData)), array_values($productsData));
}
});
// Second transaction: Insert ingredients and nutrition facts
DB::transaction(function () use ($ingredientsData, $nutritionFactsData) {
// Fetch existing product codes
$existingCodes = DB::table('products')
->whereIn('code', array_unique(array_merge(
array_column($ingredientsData, 'product_code'),
array_column($nutritionFactsData, 'product_code')
)))
->pluck('code')
->toArray();
// Filter and insert ingredients
$validIngredients = array_filter($ingredientsData, function($item) use ($existingCodes) {
return in_array($item['product_code'], $existingCodes);
});
if (!empty($validIngredients)) {
$this->bulkInsert('ingredients', ['product_code', 'name'], $validIngredients);
}
// Filter and insert nutrition facts
$validNutritionFacts = array_filter($nutritionFactsData, function($item) use ($existingCodes) {
return in_array($item['product_code'], $existingCodes);
});
if (!empty($validNutritionFacts)) {
$this->bulkInsert('nutrition_facts', ['product_code', 'nutrient', 'amount', 'unit'], $validNutritionFacts);
}
});
$this->currentChunk = [];
}
private function extractIngredientsData($row, $productCode)
{
$ingredients = [];
if (!empty($row['ingredients_tags'])) {
$ingredientNames = explode(',', $row['ingredients_tags']);
foreach ($ingredientNames as $name) {
$ingredients[] = [
'product_code' => $productCode,
'name' => trim($name)
];
}
}
return $ingredients;
}
private function bulkUpsert($table, $columns, $values)
{
$quotedColumns = array_map([$this, 'quoteIdentifier'], $columns);
$columnsString = implode(',', $quotedColumns);
$updateString = implode(',', array_map(fn($col) => $this->quoteIdentifier($col) . " = EXCLUDED." . $this->quoteIdentifier($col), array_diff($columns, ['code'])));
$valueStrings = [];
foreach ($values as $row) {
$rowValues = [];
foreach ($columns as $column) {
$value = $row[$column] ?? null;
$rowValues[] = $this->escapeValue($value);
}
$valueStrings[] = '(' . implode(',', $rowValues) . ')';
}
$valuesString = implode(',', $valueStrings);
$query = "INSERT INTO $table ($columnsString) VALUES $valuesString
ON CONFLICT (code) DO UPDATE SET $updateString";
DB::statement($query);
}
private function bulkInsert($table, $columns, $values)
{
$quotedColumns = array_map([$this, 'quoteIdentifier'], $columns);
$columnsString = implode(',', $quotedColumns);
$valueStrings = [];
foreach ($values as $row) {
$rowValues = [];
foreach ($columns as $column) {
$value = $row[$column] ?? null;
$rowValues[] = $this->escapeValue($value);
}
$valueStrings[] = '(' . implode(',', $rowValues) . ')';
}
$valuesString = implode(',', $valueStrings);
$query = "INSERT INTO $table ($columnsString) VALUES $valuesString";
DB::statement($query);
}
private function extractProductData($row)
{
$productColumns = [
'code', 'url', 'creator', 'created_t', 'created_datetime', 'last_modified_t',
'last_modified_datetime', 'last_modified_by', 'last_updated_t', 'last_updated_datetime',
'product_name', 'abbreviated_product_name', 'generic_name', 'quantity', 'packaging',
'packaging_tags', 'packaging_en', 'packaging_text', 'brands', 'brands_tags', 'categories',
'categories_tags', 'categories_en', 'origins', 'origins_tags', 'origins_en',
'manufacturing_places', 'manufacturing_places_tags', 'labels', 'labels_tags', 'labels_en',
'emb_codes', 'emb_codes_tags', 'first_packaging_code_geo', 'cities', 'cities_tags',
'purchase_places', 'stores', 'countries', 'countries_tags', 'countries_en',
'ingredients_text', 'ingredients_tags', 'ingredients_analysis_tags', 'allergens',
'allergens_en', 'traces', 'traces_tags', 'traces_en', 'serving_size', 'serving_quantity',
'no_nutrition_data', 'additives_n', 'additives', 'additives_tags', 'additives_en',
'nutriscore_score', 'nutriscore_grade', 'nova_group', 'pnns_groups_1', 'pnns_groups_2',
'food_groups', 'food_groups_tags', 'food_groups_en', 'states', 'states_tags', 'states_en',
'brand_owner', 'ecoscore_score', 'ecoscore_grade', 'nutrient_levels_tags',
'product_quantity', 'owner', 'data_quality_errors_tags', 'unique_scans_n',
'popularity_tags', 'completeness', 'last_image_t', 'last_image_datetime',
'main_category', 'main_category_en', 'image_url', 'image_small_url',
'image_ingredients_url', 'image_ingredients_small_url', 'image_nutrition_url',
'image_nutrition_small_url'
];
$data = array_intersect_key($row, array_flip($productColumns));
// Convert empty strings to null
$data = array_map(fn($value) => $value === '' ? null : $value, $data);
// Convert specific fields to appropriate types
$integerFields = ['created_t', 'last_modified_t', 'last_updated_t', 'additives_n', 'unique_scans_n'];
$floatFields = ['serving_quantity', 'nutriscore_score', 'ecoscore_score', 'completeness'];
$booleanFields = ['no_nutrition_data'];
foreach ($integerFields as $field) {
if (isset($data[$field])) {
$data[$field] = $data[$field] !== null ? (int)$data[$field] : null;
}
}
foreach ($floatFields as $field) {
if (isset($data[$field])) {
$data[$field] = $data[$field] !== null ? (float)$data[$field] : null;
}
}
foreach ($booleanFields as $field) {
if (isset($data[$field])) {
$data[$field] = $this->parseBoolean($data[$field]);
continue;
}
$data[$field] = false;
}
// Add created_at and updated_at
$now = now()->format('Y-m-d H:i:s');
$data['created_at'] = $now;
$data['updated_at'] = $now;
return $data;
}
private function extractNutritionFactsData($row, $productCode)
{
$nutritionFields = [
'energy-kj_100g', 'energy-kcal_100g', 'energy_100g', 'fat_100g', 'saturated-fat_100g',
'carbohydrates_100g', 'sugars_100g', 'fiber_100g', 'proteins_100g', 'salt_100g', 'sodium_100g'
];
$nutritionFacts = [];
foreach ($nutritionFields as $field) {
if (isset($row[$field]) && $row[$field] !== '') {
$nutritionFacts[] = [
'product_code' => $productCode,
'nutrient' => $field,
'amount' => (float)$row[$field],
'unit' => $this->getNutrientUnit($field)
];
}
}
return $nutritionFacts;
}
private function getNutrientUnit($nutrient)
{
if (strpos($nutrient, 'energy') !== false) {
return strpos($nutrient, 'kcal') !== false ? 'kcal' : 'kJ';
}
return 'g';
}
private function quoteIdentifier($identifier)
{
return '"' . str_replace('"', '""', $identifier) . '"';
}
private function escapeValue($value)
{
if ($value === null) {
return 'NULL';
}
if (is_bool($value)) {
return $value ? 'TRUE' : 'FALSE';
}
if (is_int($value) || is_float($value)) {
return $value;
}
// For strings, escape single quotes and backslashes
return "'" . str_replace(["\\", "'"], ["\\\\", "''"], $value) . "'";
}
private function parseBoolean($value)
{
if (is_bool($value)) {
return $value;
}
if (is_string($value)) {
$value = strtolower($value);
if ($value === 'true' || $value === '1' || $value === 'yes') {
return true;
}
if ($value === 'false' || $value === '0' || $value === 'no' || $value === '') {
return false;
}
}
if (is_numeric($value)) {
return (bool)$value;
}
return false; // Default to false for any other case
}
}