const { Worker } = require("bullmq") const Redis = require("ioredis") const { getRedisConnectionOptions } = require("../services/redis/connection") const voteDb = require("../db/vote.db") const { VOTE_HASH_KEY, COMMENT_LIKE_HASH_KEY, COMMENT_HASH_KEY, COMMENT_DELETE_HASH_KEY, DEAL_UPDATE_HASH_KEY, DEAL_CREATE_HASH_KEY, DEAL_AI_REVIEW_HASH_KEY, NOTIFICATION_HASH_KEY, NOTIFICATION_READ_HASH_KEY, DEAL_SAVE_HASH_KEY, AUDIT_HASH_KEY, USER_UPDATE_HASH_KEY, USER_NOTE_HASH_KEY, DEAL_REPORT_UPDATE_HASH_KEY, CATEGORY_UPSERT_HASH_KEY, SELLER_UPSERT_HASH_KEY, SELLER_DOMAIN_UPSERT_HASH_KEY, } = require("../services/redis/dbSync.service") const { DEAL_ANALYTICS_TOTAL_HASH_KEY } = require("../services/redis/dealAnalytics.service") const commentLikeDb = require("../db/commentLike.db") const dealAnalyticsDb = require("../db/dealAnalytics.db") const userInterestProfileDb = require("../db/userInterestProfile.db") const prisma = require("../db/client") const { getUserInterestIncrementHashKeys } = require("../services/userInterest.service") const USER_INTEREST_DB_APPLY_BATCH_SIZE = Math.max( 100, Number(process.env.USER_INTEREST_DB_APPLY_BATCH_SIZE) || 2000 ) function createRedisClient() { return new Redis(getRedisConnectionOptions()) } function normalizeJsonValue(value) { if (value === undefined || value === null) return null try { return JSON.parse(JSON.stringify(value)) } catch { return null } } async function consumeUserUpdates(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, USER_UPDATE_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const dedup = new Map() for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) const id = Number(parsed.userId) if (!id || !parsed?.data) continue const updatedAt = parsed.updatedAt ? new Date(parsed.updatedAt) : new Date() const existing = dedup.get(id) if (!existing || updatedAt > existing.updatedAt) { dedup.set(id, { userId: id, data: parsed.data, updatedAt }) } } catch (err) { console.error("db-sync user update parse failed:", err?.message || err) } } const items = Array.from(dedup.values()) if (!items.length) return 0 let updated = 0 for (const item of items) { try { await prisma.user.update({ where: { id: item.userId }, data: { ...item.data, updatedAt: item.updatedAt }, }) updated += 1 } catch (err) { console.error("db-sync user update failed:", err?.message || err) } } return updated } async function consumeUserNotes(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, USER_NOTE_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const items = [] for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) if (!parsed?.userId || !parsed?.createdById || !parsed?.note) continue items.push({ userId: Number(parsed.userId), createdById: Number(parsed.createdById), note: String(parsed.note), createdAt: parsed.createdAt ? new Date(parsed.createdAt) : new Date(), }) } catch (err) { console.error("db-sync user note parse failed:", err?.message || err) } } if (!items.length) return 0 try { await prisma.userNote.createMany({ data: items }) return items.length } catch (err) { console.error("db-sync user note batch failed:", err?.message || err) return 0 } } async function consumeDealReportUpdates(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, DEAL_REPORT_UPDATE_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const items = [] for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) if (!parsed?.reportId || !parsed?.status) continue items.push({ reportId: Number(parsed.reportId), status: String(parsed.status), updatedAt: parsed.updatedAt ? new Date(parsed.updatedAt) : new Date(), }) } catch (err) { console.error("db-sync dealReport update parse failed:", err?.message || err) } } if (!items.length) return 0 let updated = 0 for (const item of items) { try { await prisma.dealReport.update({ where: { id: item.reportId }, data: { status: item.status, updatedAt: item.updatedAt }, }) updated += 1 } catch (err) { console.error("db-sync dealReport update failed:", err?.message || err) } } return updated } async function consumeCategoryUpserts(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, CATEGORY_UPSERT_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const items = [] for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) if (!parsed?.categoryId || !parsed?.data) continue items.push({ categoryId: Number(parsed.categoryId), data: parsed.data, }) } catch (err) { console.error("db-sync category upsert parse failed:", err?.message || err) } } if (!items.length) return 0 let updated = 0 for (const item of items) { try { await prisma.category.upsert({ where: { id: item.categoryId }, create: { id: item.categoryId, ...item.data }, update: item.data, }) updated += 1 } catch (err) { console.error("db-sync category upsert failed:", err?.message || err) } } return updated } async function consumeSellerUpserts(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, SELLER_UPSERT_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const items = [] for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) if (!parsed?.sellerId || !parsed?.data) continue items.push({ sellerId: Number(parsed.sellerId), data: parsed.data, }) } catch (err) { console.error("db-sync seller upsert parse failed:", err?.message || err) } } if (!items.length) return 0 let updated = 0 for (const item of items) { try { await prisma.seller.upsert({ where: { id: item.sellerId }, create: { id: item.sellerId, ...item.data }, update: item.data, }) updated += 1 } catch (err) { console.error("db-sync seller upsert failed:", err?.message || err) } } return updated } async function consumeSellerDomainUpserts(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, SELLER_DOMAIN_UPSERT_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const items = [] for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) if (!parsed?.sellerId || !parsed?.domain || !parsed?.createdById) continue items.push({ sellerId: Number(parsed.sellerId), domain: String(parsed.domain).toLowerCase(), createdById: Number(parsed.createdById), }) } catch (err) { console.error("db-sync seller domain parse failed:", err?.message || err) } } if (!items.length) return 0 let created = 0 for (const item of items) { try { await prisma.sellerDomain.upsert({ where: { domain: item.domain }, create: { domain: item.domain, sellerId: item.sellerId, createdById: item.createdById, }, update: { sellerId: item.sellerId }, }) created += 1 } catch (err) { console.error("db-sync seller domain upsert failed:", err?.message || err) } } return created } async function consumeVoteUpdates(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, VOTE_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const dedup = new Map() for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) const key = `${parsed.dealId}:${parsed.userId}` const createdAt = parsed.createdAt ? new Date(parsed.createdAt) : new Date() const existing = dedup.get(key) if (!existing || createdAt > existing.createdAt) { dedup.set(key, { dealId: Number(parsed.dealId), userId: Number(parsed.userId), voteType: Number(parsed.voteType), createdAt, }) } } catch (err) { console.error("db-sync vote parse/update failed:", err?.message || err) } } const batch = Array.from(dedup.values()) if (!batch.length) return 0 const result = await voteDb.voteDealBatchTx(batch) return result?.count ?? batch.length } async function consumeUserInterestIncrements(redis) { const hashKeys = getUserInterestIncrementHashKeys() if (!hashKeys.length) return 0 const increments = [] for (const hashKey of hashKeys) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, hashKey ) if (!data || data.length === 0) continue for (let i = 0; i < data.length; i += 2) { try { const field = String(data[i] || "") const points = Number(data[i + 1] || 0) if (!field || !Number.isFinite(points) || points <= 0) continue const [userIdRaw, categoryIdRaw] = field.split(":") const userId = Number(userIdRaw) const categoryId = Number(categoryIdRaw) if (!Number.isInteger(userId) || userId <= 0) continue if (!Number.isInteger(categoryId) || categoryId <= 0) continue increments.push({ userId, categoryId, points: Math.floor(points), }) } catch (err) { console.error("db-sync userInterest parse failed:", err?.message || err) } } } if (!increments.length) return 0 let updated = 0 for (let i = 0; i < increments.length; i += USER_INTEREST_DB_APPLY_BATCH_SIZE) { const chunk = increments.slice(i, i + USER_INTEREST_DB_APPLY_BATCH_SIZE) try { const result = await userInterestProfileDb.applyInterestIncrementsBatch(chunk) updated += Number(result?.updated || 0) } catch (err) { console.error("db-sync userInterest batch failed:", err?.message || err) } } return updated } async function consumeCommentLikeUpdates(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, COMMENT_LIKE_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const dedup = new Map() for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) const key = `${parsed.commentId}:${parsed.userId}` const createdAt = parsed.createdAt ? new Date(parsed.createdAt) : new Date() const existing = dedup.get(key) if (!existing || createdAt > existing.createdAt) { dedup.set(key, { commentId: Number(parsed.commentId), userId: Number(parsed.userId), like: Boolean(parsed.like), createdAt, }) } } catch (err) { console.error("db-sync commentLike parse failed:", err?.message || err) } } const batch = Array.from(dedup.values()) if (!batch.length) return 0 try { const result = await commentLikeDb.applyCommentLikeBatch(batch) return (result?.inserted || 0) + (result?.deleted || 0) } catch (err) { console.error("db-sync commentLike batch failed:", err?.message || err) return 0 } } async function consumeCommentCreates(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, COMMENT_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const items = [] for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) items.push({ id: Number(parsed.commentId), dealId: Number(parsed.dealId), userId: Number(parsed.userId), text: String(parsed.text || ""), parentId: parsed.parentId ? Number(parsed.parentId) : null, createdAt: parsed.createdAt ? new Date(parsed.createdAt) : new Date(), }) } catch (err) { console.error("db-sync comment parse failed:", err?.message || err) } } if (!items.length) return 0 return prisma.$transaction(async (tx) => { let created = 0 for (const item of items) { try { await tx.comment.create({ data: { id: item.id, dealId: item.dealId, userId: item.userId, text: item.text, parentId: item.parentId, createdAt: item.createdAt, }, }) await tx.deal.update({ where: { id: item.dealId }, data: { commentCount: { increment: 1 } }, }) created += 1 } catch (err) { // ignore duplicates } } return created }) } async function consumeCommentDeletes(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, COMMENT_DELETE_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const items = [] for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) items.push({ commentId: Number(parsed.commentId), dealId: Number(parsed.dealId), }) } catch (err) { console.error("db-sync comment delete parse failed:", err?.message || err) } } if (!items.length) return 0 return prisma.$transaction(async (tx) => { let deleted = 0 for (const item of items) { const result = await tx.comment.updateMany({ where: { id: item.commentId, deletedAt: null }, data: { deletedAt: new Date() }, }) if (result.count > 0) { await tx.deal.update({ where: { id: item.dealId }, data: { commentCount: { decrement: 1 } }, }) deleted += 1 } } return deleted }) } async function consumeDealAnalyticsTotals(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, DEAL_ANALYTICS_TOTAL_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const increments = [] for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) if (!parsed?.dealId) continue increments.push({ dealId: Number(parsed.dealId), impressions: Number(parsed.impressions || 0), views: Number(parsed.views || 0), clicks: Number(parsed.clicks || 0), }) } catch (err) { console.error("db-sync dealAnalyticsTotals parse failed:", err?.message || err) } } if (!increments.length) return 0 try { const result = await dealAnalyticsDb.applyDealTotalsBatch(increments) return result?.updated ?? 0 } catch (err) { console.error("db-sync dealAnalyticsTotals batch failed:", err?.message || err) return 0 } } function normalizeDealCreateData(data = {}) { return { id: Number(data.id), title: String(data.title || ""), description: data.description ?? null, url: data.url ?? null, price: data.price ?? null, originalPrice: data.originalPrice ?? null, shippingPrice: data.shippingPrice ?? null, percentOff: data.percentOff ?? null, couponCode: data.couponCode ?? null, location: data.location ?? null, discountType: data.discountType ?? null, discountValue: data.discountValue ?? null, maxNotifiedMilestone: Number.isFinite(Number(data.maxNotifiedMilestone)) ? Number(data.maxNotifiedMilestone) : 0, userId: Number(data.userId), status: String(data.status || "PENDING"), saletype: String(data.saletype || "ONLINE"), affiliateType: String(data.affiliateType || "NON_AFFILIATE"), sellerId: data.sellerId ? Number(data.sellerId) : null, customSeller: data.customSeller ?? null, categoryId: Number.isInteger(Number(data.categoryId)) ? Number(data.categoryId) : 0, createdAt: data.createdAt ? new Date(data.createdAt) : new Date(), updatedAt: data.updatedAt ? new Date(data.updatedAt) : new Date(), } } async function consumeDealCreates(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, DEAL_CREATE_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const items = [] for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) if (!parsed?.dealId || !parsed?.data) continue items.push({ dealId: Number(parsed.dealId), data: normalizeDealCreateData(parsed.data), images: Array.isArray(parsed.images) ? parsed.images : [], }) } catch (err) { console.error("db-sync deal create parse failed:", err?.message || err) } } if (!items.length) return 0 let created = 0 for (const item of items) { try { const data = { ...item.data, id: item.dealId } await prisma.deal.create({ data }) await dealAnalyticsDb.ensureTotalsForDealIds([item.dealId]) if (item.images.length) { const imagesData = item.images.map((img) => ({ dealId: item.dealId, imageUrl: String(img.imageUrl || ""), order: Number(img.order || 0), })) await prisma.dealImage.createMany({ data: imagesData }) } created += 1 } catch (err) { console.error("db-sync deal create failed:", err?.message || err) } } if (created > 0) { await prisma.$executeRawUnsafe( 'SELECT setval(pg_get_serial_sequence(\'"Deal"\', \'id\'), (SELECT MAX(id) FROM "Deal"))' ) } return created } async function consumeDealAiReviewUpdates(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, DEAL_AI_REVIEW_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const dedup = new Map() for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) const dealId = Number(parsed.dealId) if (!dealId || !parsed?.data) continue const updatedAt = parsed.updatedAt ? new Date(parsed.updatedAt) : new Date() const existing = dedup.get(dealId) if (!existing || updatedAt > existing.updatedAt) { dedup.set(dealId, { dealId, data: parsed.data, updatedAt, }) } } catch (err) { console.error("db-sync deal aiReview parse failed:", err?.message || err) } } const batch = Array.from(dedup.values()) if (!batch.length) return 0 let updated = 0 for (const item of batch) { try { await prisma.dealAiReview.upsert({ where: { dealId: item.dealId }, create: { dealId: item.dealId, bestCategoryId: Number(item.data.bestCategoryId) || 0, tags: Array.isArray(item.data.tags) ? item.data.tags : [], needsReview: Boolean(item.data.needsReview), hasIssue: Boolean(item.data.hasIssue), issueType: String(item.data.issueType || "NONE"), issueReason: item.data.issueReason ?? null, }, update: { bestCategoryId: Number(item.data.bestCategoryId) || 0, tags: Array.isArray(item.data.tags) ? item.data.tags : [], needsReview: Boolean(item.data.needsReview), hasIssue: Boolean(item.data.hasIssue), issueType: String(item.data.issueType || "NONE"), issueReason: item.data.issueReason ?? null, }, }) updated += 1 } catch (err) { console.error("db-sync deal aiReview update failed:", err?.message || err) } } return updated } const DEAL_UPDATE_FIELDS = new Set([ "title", "description", "url", "price", "originalPrice", "shippingPrice", "couponCode", "location", "discountType", "discountValue", "maxNotifiedMilestone", "status", "saletype", "affiliateType", "sellerId", "customSeller", "categoryId", "userId", ]) function sanitizeDealUpdate(data) { const patch = {} if (!data || typeof data !== "object") return patch for (const [key, value] of Object.entries(data)) { if (!DEAL_UPDATE_FIELDS.has(key)) continue patch[key] = value } return patch } async function consumeDealUpdates(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, DEAL_UPDATE_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const dedup = new Map() for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) const id = Number(parsed.dealId) if (!id) continue const updatedAt = parsed.updatedAt ? new Date(parsed.updatedAt) : new Date() const existing = dedup.get(id) if (!existing || updatedAt > existing.updatedAt) { dedup.set(id, { dealId: id, data: sanitizeDealUpdate(parsed.data), updatedAt, }) } } catch (err) { console.error("db-sync deal update parse failed:", err?.message || err) } } const batch = Array.from(dedup.values()).filter((item) => Object.keys(item.data).length) if (!batch.length) return 0 let updated = 0 for (const item of batch) { try { await prisma.deal.update({ where: { id: item.dealId }, data: { ...item.data, updatedAt: item.updatedAt }, }) updated += 1 } catch (err) { console.error("db-sync deal update failed:", err?.message || err) } } return updated } async function consumeNotifications(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, NOTIFICATION_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const items = [] for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) if (!parsed?.userId || !parsed?.message) continue items.push({ userId: Number(parsed.userId), message: String(parsed.message), type: String(parsed.type || "INFO"), extras: normalizeJsonValue(parsed.extras), createdAt: parsed.createdAt ? new Date(parsed.createdAt) : new Date(), }) } catch (err) { console.error("db-sync notification parse failed:", err?.message || err) } } if (!items.length) return 0 let created = 0 for (const item of items) { try { await prisma.$transaction(async (tx) => { await tx.notification.create({ data: { userId: item.userId, message: item.message, type: item.type, extras: item.extras, createdAt: item.createdAt, }, }) await tx.user.update({ where: { id: item.userId }, data: { notificationCount: { increment: 1 } }, }) }) created += 1 } catch (err) { console.error("db-sync notification create failed:", err?.message || err) } } return created } async function consumeNotificationReads(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, NOTIFICATION_READ_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const dedup = new Map() for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) if (!parsed?.userId) continue const readAt = parsed.readAt ? new Date(parsed.readAt) : new Date() const existing = dedup.get(parsed.userId) if (!existing || readAt > existing.readAt) { dedup.set(parsed.userId, { userId: Number(parsed.userId), readAt }) } } catch (err) { console.error("db-sync notification read parse failed:", err?.message || err) } } const items = Array.from(dedup.values()) if (!items.length) return 0 let updated = 0 for (const item of items) { try { const result = await prisma.notification.updateMany({ where: { userId: item.userId, readAt: null, createdAt: { lte: item.readAt }, }, data: { readAt: item.readAt }, }) updated += result?.count || 0 } catch (err) { console.error("db-sync notification read update failed:", err?.message || err) } } return updated } async function consumeDealSaveUpdates(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, DEAL_SAVE_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const dedup = new Map() for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) const key = `${parsed.dealId}:${parsed.userId}` const createdAt = parsed.createdAt ? new Date(parsed.createdAt) : new Date() const existing = dedup.get(key) if (!existing || createdAt > existing.createdAt) { dedup.set(key, { dealId: Number(parsed.dealId), userId: Number(parsed.userId), action: String(parsed.action || "SAVE").toUpperCase(), createdAt, }) } } catch (err) { console.error("db-sync dealSave parse failed:", err?.message || err) } } const items = Array.from(dedup.values()) if (!items.length) return 0 const saves = items.filter((i) => i.action === "SAVE") const unsaves = items.filter((i) => i.action === "UNSAVE") try { await prisma.$transaction(async (tx) => { if (saves.length) { await tx.dealSave.createMany({ data: saves.map((i) => ({ dealId: i.dealId, userId: i.userId, createdAt: i.createdAt, })), skipDuplicates: true, }) } if (unsaves.length) { await tx.dealSave.deleteMany({ where: { OR: unsaves.map((i) => ({ dealId: i.dealId, userId: i.userId, })), }, }) } }) } catch (err) { console.error("db-sync dealSave batch failed:", err?.message || err) return 0 } return items.length } async function consumeAuditEvents(redis) { const data = await redis.eval( "local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;", 1, AUDIT_HASH_KEY ) if (!data || data.length === 0) return 0 const pairs = {} for (let i = 0; i < data.length; i += 2) { pairs[data[i]] = data[i + 1] } const items = [] for (const payload of Object.values(pairs)) { try { const parsed = JSON.parse(payload) if (!parsed?.action) continue items.push({ userId: parsed.userId ? Number(parsed.userId) : null, action: String(parsed.action), ip: parsed.ip ?? null, userAgent: parsed.userAgent ?? null, meta: parsed.meta ?? null, createdAt: parsed.createdAt ? new Date(parsed.createdAt) : new Date(), }) } catch (err) { console.error("db-sync audit parse failed:", err?.message || err) } } if (!items.length) return 0 try { await prisma.auditEvent.createMany({ data: items }) } catch (err) { console.error("db-sync audit batch failed:", err?.message || err) return 0 } return items.length } async function handler() { const redis = createRedisClient() try { const commentCreateCount = await consumeCommentCreates(redis) const commentLikeCount = await consumeCommentLikeUpdates(redis) const commentDeleteCount = await consumeCommentDeletes(redis) const dealSaveCount = await consumeDealSaveUpdates(redis) const dealEventCount = await consumeDealAnalyticsTotals(redis) const dealCreateCount = await consumeDealCreates(redis) const dealAiReviewCount = await consumeDealAiReviewUpdates(redis) const notificationReadCount = await consumeNotificationReads(redis) const notificationCount = await consumeNotifications(redis) const dealUpdateCount = await consumeDealUpdates(redis) const voteCount = await consumeVoteUpdates(redis) const auditCount = await consumeAuditEvents(redis) const userUpdateCount = await consumeUserUpdates(redis) const userNoteCount = await consumeUserNotes(redis) const dealReportUpdateCount = await consumeDealReportUpdates(redis) const categoryUpsertCount = await consumeCategoryUpserts(redis) const sellerUpsertCount = await consumeSellerUpserts(redis) const sellerDomainUpsertCount = await consumeSellerDomainUpserts(redis) const userInterestCount = await consumeUserInterestIncrements(redis) return { votes: voteCount, commentLikes: commentLikeCount, commentsCreated: commentCreateCount, commentsDeleted: commentDeleteCount, dealSaves: dealSaveCount, dealEvents: dealEventCount, dealCreates: dealCreateCount, dealAiReviews: dealAiReviewCount, notificationsRead: notificationReadCount, notifications: notificationCount, dealUpdates: dealUpdateCount, audits: auditCount, userUpdates: userUpdateCount, userNotes: userNoteCount, dealReportUpdates: dealReportUpdateCount, categoryUpserts: categoryUpsertCount, sellerUpserts: sellerUpsertCount, sellerDomainUpserts: sellerDomainUpsertCount, userInterests: userInterestCount, } } finally { redis.disconnect() } } function startDbSyncWorker() { const worker = new Worker("db-sync", handler, { connection: getRedisConnectionOptions(), concurrency: 1, }) worker.on("completed", (job) => { console.log( `DB sync batch done. Votes: ${job.returnvalue?.votes ?? 0} CommentLikes: ${job.returnvalue?.commentLikes ?? 0} CommentsCreated: ${job.returnvalue?.commentsCreated ?? 0} CommentsDeleted: ${job.returnvalue?.commentsDeleted ?? 0} DealSaves: ${job.returnvalue?.dealSaves ?? 0} DealEvents: ${job.returnvalue?.dealEvents ?? 0} DealCreates: ${job.returnvalue?.dealCreates ?? 0} DealAiReviews: ${job.returnvalue?.dealAiReviews ?? 0} NotificationsRead: ${job.returnvalue?.notificationsRead ?? 0} Notifications: ${job.returnvalue?.notifications ?? 0} DealUpdates: ${job.returnvalue?.dealUpdates ?? 0} Audits: ${job.returnvalue?.audits ?? 0} UserUpdates: ${job.returnvalue?.userUpdates ?? 0} UserNotes: ${job.returnvalue?.userNotes ?? 0} DealReportUpdates: ${job.returnvalue?.dealReportUpdates ?? 0} CategoryUpserts: ${job.returnvalue?.categoryUpserts ?? 0} SellerUpserts: ${job.returnvalue?.sellerUpserts ?? 0} SellerDomainUpserts: ${job.returnvalue?.sellerDomainUpserts ?? 0} UserInterests: ${job.returnvalue?.userInterests ?? 0}` ) }) worker.on("failed", (job, err) => { console.error(`❌ DB sync batch failed: ${job?.id}`, err?.message) }) return worker } module.exports = { startDbSyncWorker }