Add Columbia catalog mapper, shared upsert, and sync service
Phase 2: the mapping and sync core. - ColumbiaCatalogMapper (pure/static, unit-tested): maps an API product to a PowderCatalogItem. Derives manufacturer (PPG/KP Pigments/Columbia) from taxonomy+SKU; flags additives into the Powder Additives category; takes base price from the top-level price with variant fallback; captures variation / tiered pricing as JSON; parses the free-text cure schedule into all curves (three degree glyphs, @/at, multi-curve in order, partial-cure -> none) with the first as the primary temp/time; strips HTML descriptions; joins color groups; normalizes chemistry; flags clear-coat powders. - PowderCatalogUpsertService (IPowderCatalogUpsertService): single upsert path matching on (VendorName, SKU). Copies only feed-sourced fields and leaves enrichment fields (specific gravity, coverage, transfer efficiency, finish) untouched so syncs never wipe lazily-enriched TDS/AI data. - ColumbiaCatalogSyncService (IColumbiaCatalogSyncService): pulls the full catalog, maps + de-dupes, upserts, then reconciles discontinuations ONLY on a complete pull (a partial pull throws and aborts before the sweep). Reactivates reappearing items; records last-synced/last-result platform settings. - 25 mapper unit tests covering the cure parser, manufacturer derivation, simple/variable pricing, chemistry, color, and HTML cases from real records. Full suite green (261 passed). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,155 @@
|
||||
using System.Diagnostics;
|
||||
using System.Globalization;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using PowderCoating.Application.Constants;
|
||||
using PowderCoating.Application.Interfaces;
|
||||
using PowderCoating.Core.Entities;
|
||||
using PowderCoating.Core.Interfaces;
|
||||
|
||||
namespace PowderCoating.Infrastructure.Services.Columbia;
|
||||
|
||||
/// <summary>
|
||||
/// Full Columbia Coatings catalog sync: pages the API, maps each product, upserts via the shared
|
||||
/// <see cref="IPowderCatalogUpsertService"/>, then reconciles discontinuations against the complete
|
||||
/// pull. The discontinuation sweep runs ONLY after a successful full fetch — a partial pull (any
|
||||
/// page failure throws from the client) aborts before the sweep so a transient error can never mass
|
||||
/// flag the catalog as discontinued.
|
||||
/// </summary>
|
||||
public class ColumbiaCatalogSyncService : IColumbiaCatalogSyncService
|
||||
{
|
||||
private readonly IColumbiaCoatingsApiClient _client;
|
||||
private readonly IPowderCatalogUpsertService _upsert;
|
||||
private readonly IUnitOfWork _unitOfWork;
|
||||
private readonly IPlatformSettingsService _settings;
|
||||
private readonly ILogger<ColumbiaCatalogSyncService> _logger;
|
||||
|
||||
public ColumbiaCatalogSyncService(
|
||||
IColumbiaCoatingsApiClient client,
|
||||
IPowderCatalogUpsertService upsert,
|
||||
IUnitOfWork unitOfWork,
|
||||
IPlatformSettingsService settings,
|
||||
ILogger<ColumbiaCatalogSyncService> logger)
|
||||
{
|
||||
_client = client;
|
||||
_upsert = upsert;
|
||||
_unitOfWork = unitOfWork;
|
||||
_settings = settings;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public async Task<ColumbiaSyncResult> RunSyncAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
var result = new ColumbiaSyncResult { StartedAt = DateTime.UtcNow };
|
||||
var stopwatch = Stopwatch.StartNew();
|
||||
|
||||
if (!_client.IsConfigured)
|
||||
{
|
||||
result.ErrorMessage = "Columbia API key is not configured.";
|
||||
await RecordResultAsync(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
// Full pull — throws on any page failure, which we treat as an incomplete sync.
|
||||
var products = await _client.GetAllProductsAsync(cancellationToken);
|
||||
result.TotalFetched = products.Count;
|
||||
|
||||
// Map and de-duplicate by (VendorName, SKU) in case the feed repeats a SKU.
|
||||
var mapped = products
|
||||
.Select(ColumbiaCatalogMapper.Map)
|
||||
.Where(m => !string.IsNullOrWhiteSpace(m.Sku))
|
||||
.GroupBy(m => $"{m.VendorName}|{m.Sku}", StringComparer.OrdinalIgnoreCase)
|
||||
.Select(g => g.First())
|
||||
.ToList();
|
||||
|
||||
var upsertResult = await _upsert.UpsertAsync(mapped, result.StartedAt, cancellationToken);
|
||||
result.Inserted = upsertResult.Inserted;
|
||||
result.Updated = upsertResult.Updated;
|
||||
result.Unchanged = upsertResult.Unchanged;
|
||||
result.Skipped = upsertResult.Skipped;
|
||||
|
||||
// Complete pull succeeded — safe to reconcile discontinuations.
|
||||
var incomingKeys = mapped
|
||||
.Select(m => $"{m.VendorName}|{m.Sku}")
|
||||
.ToHashSet(StringComparer.OrdinalIgnoreCase);
|
||||
(result.Discontinued, result.Reactivated) =
|
||||
await ReconcileDiscontinuationsAsync(incomingKeys, result.StartedAt);
|
||||
|
||||
result.Success = true;
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Columbia catalog sync failed; skipping discontinuation sweep.");
|
||||
result.Success = false;
|
||||
result.ErrorMessage = ex.Message;
|
||||
}
|
||||
|
||||
stopwatch.Stop();
|
||||
result.Duration = stopwatch.Elapsed;
|
||||
await RecordResultAsync(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Flags catalog items sourced from Columbia that were NOT in this complete pull as discontinued,
|
||||
/// and reactivates any previously-discontinued item that has reappeared. Returns (discontinued,
|
||||
/// reactivated) counts.
|
||||
/// </summary>
|
||||
private async Task<(int Discontinued, int Reactivated)> ReconcileDiscontinuationsAsync(
|
||||
HashSet<string> incomingKeys, DateTime runTimestamp)
|
||||
{
|
||||
var sourced = await _unitOfWork.PowderCatalog.FindAsync(
|
||||
p => p.Source == ColumbiaIntegrationConstants.SourceName);
|
||||
|
||||
var discontinued = 0;
|
||||
var reactivated = 0;
|
||||
|
||||
foreach (var item in sourced)
|
||||
{
|
||||
var present = incomingKeys.Contains($"{item.VendorName}|{item.Sku}");
|
||||
|
||||
if (!present && !item.IsDiscontinued)
|
||||
{
|
||||
item.IsDiscontinued = true;
|
||||
item.UpdatedAt = runTimestamp;
|
||||
await _unitOfWork.PowderCatalog.UpdateAsync(item);
|
||||
discontinued++;
|
||||
}
|
||||
else if (present && item.IsDiscontinued)
|
||||
{
|
||||
item.IsDiscontinued = false;
|
||||
item.UpdatedAt = runTimestamp;
|
||||
await _unitOfWork.PowderCatalog.UpdateAsync(item);
|
||||
reactivated++;
|
||||
}
|
||||
}
|
||||
|
||||
if (discontinued > 0 || reactivated > 0)
|
||||
await _unitOfWork.CompleteAsync();
|
||||
|
||||
return (discontinued, reactivated);
|
||||
}
|
||||
|
||||
/// <summary>Persists the run outcome to the last-synced / last-result platform settings.</summary>
|
||||
private async Task RecordResultAsync(ColumbiaSyncResult result)
|
||||
{
|
||||
if (result.Success)
|
||||
{
|
||||
await _settings.SetAsync(
|
||||
ColumbiaIntegrationConstants.SettingLastSyncedAt,
|
||||
result.StartedAt.ToString("O", CultureInfo.InvariantCulture),
|
||||
updatedBy: "Columbia Sync");
|
||||
}
|
||||
|
||||
await _settings.SetAsync(
|
||||
ColumbiaIntegrationConstants.SettingLastResult,
|
||||
result.Summary,
|
||||
updatedBy: "Columbia Sync");
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user