HotTRDealsBackend/workers/dbSync.worker.js
2026-02-09 21:47:55 +00:00

1132 lines
33 KiB
JavaScript

const { Worker } = require("bullmq")
const Redis = require("ioredis")
const { getRedisConnectionOptions } = require("../services/redis/connection")
const voteDb = require("../db/vote.db")
const {
VOTE_HASH_KEY,
COMMENT_LIKE_HASH_KEY,
COMMENT_HASH_KEY,
COMMENT_DELETE_HASH_KEY,
DEAL_UPDATE_HASH_KEY,
DEAL_CREATE_HASH_KEY,
DEAL_AI_REVIEW_HASH_KEY,
NOTIFICATION_HASH_KEY,
NOTIFICATION_READ_HASH_KEY,
DEAL_SAVE_HASH_KEY,
AUDIT_HASH_KEY,
USER_UPDATE_HASH_KEY,
USER_NOTE_HASH_KEY,
DEAL_REPORT_UPDATE_HASH_KEY,
CATEGORY_UPSERT_HASH_KEY,
SELLER_UPSERT_HASH_KEY,
SELLER_DOMAIN_UPSERT_HASH_KEY,
} = require("../services/redis/dbSync.service")
const { DEAL_ANALYTICS_TOTAL_HASH_KEY } = require("../services/redis/dealAnalytics.service")
const commentLikeDb = require("../db/commentLike.db")
const dealAnalyticsDb = require("../db/dealAnalytics.db")
const userInterestProfileDb = require("../db/userInterestProfile.db")
const prisma = require("../db/client")
const { getUserInterestIncrementHashKeys } = require("../services/userInterest.service")
const USER_INTEREST_DB_APPLY_BATCH_SIZE = Math.max(
100,
Number(process.env.USER_INTEREST_DB_APPLY_BATCH_SIZE) || 2000
)
function createRedisClient() {
return new Redis(getRedisConnectionOptions())
}
function normalizeJsonValue(value) {
if (value === undefined || value === null) return null
try {
return JSON.parse(JSON.stringify(value))
} catch {
return null
}
}
async function consumeUserUpdates(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
USER_UPDATE_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const dedup = new Map()
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
const id = Number(parsed.userId)
if (!id || !parsed?.data) continue
const updatedAt = parsed.updatedAt ? new Date(parsed.updatedAt) : new Date()
const existing = dedup.get(id)
if (!existing || updatedAt > existing.updatedAt) {
dedup.set(id, { userId: id, data: parsed.data, updatedAt })
}
} catch (err) {
console.error("db-sync user update parse failed:", err?.message || err)
}
}
const items = Array.from(dedup.values())
if (!items.length) return 0
let updated = 0
for (const item of items) {
try {
await prisma.user.update({
where: { id: item.userId },
data: { ...item.data, updatedAt: item.updatedAt },
})
updated += 1
} catch (err) {
console.error("db-sync user update failed:", err?.message || err)
}
}
return updated
}
async function consumeUserNotes(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
USER_NOTE_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const items = []
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
if (!parsed?.userId || !parsed?.createdById || !parsed?.note) continue
items.push({
userId: Number(parsed.userId),
createdById: Number(parsed.createdById),
note: String(parsed.note),
createdAt: parsed.createdAt ? new Date(parsed.createdAt) : new Date(),
})
} catch (err) {
console.error("db-sync user note parse failed:", err?.message || err)
}
}
if (!items.length) return 0
try {
await prisma.userNote.createMany({ data: items })
return items.length
} catch (err) {
console.error("db-sync user note batch failed:", err?.message || err)
return 0
}
}
async function consumeDealReportUpdates(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
DEAL_REPORT_UPDATE_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const items = []
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
if (!parsed?.reportId || !parsed?.status) continue
items.push({
reportId: Number(parsed.reportId),
status: String(parsed.status),
updatedAt: parsed.updatedAt ? new Date(parsed.updatedAt) : new Date(),
})
} catch (err) {
console.error("db-sync dealReport update parse failed:", err?.message || err)
}
}
if (!items.length) return 0
let updated = 0
for (const item of items) {
try {
await prisma.dealReport.update({
where: { id: item.reportId },
data: { status: item.status, updatedAt: item.updatedAt },
})
updated += 1
} catch (err) {
console.error("db-sync dealReport update failed:", err?.message || err)
}
}
return updated
}
async function consumeCategoryUpserts(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
CATEGORY_UPSERT_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const items = []
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
if (!parsed?.categoryId || !parsed?.data) continue
items.push({
categoryId: Number(parsed.categoryId),
data: parsed.data,
})
} catch (err) {
console.error("db-sync category upsert parse failed:", err?.message || err)
}
}
if (!items.length) return 0
let updated = 0
for (const item of items) {
try {
await prisma.category.upsert({
where: { id: item.categoryId },
create: { id: item.categoryId, ...item.data },
update: item.data,
})
updated += 1
} catch (err) {
console.error("db-sync category upsert failed:", err?.message || err)
}
}
return updated
}
async function consumeSellerUpserts(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
SELLER_UPSERT_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const items = []
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
if (!parsed?.sellerId || !parsed?.data) continue
items.push({
sellerId: Number(parsed.sellerId),
data: parsed.data,
})
} catch (err) {
console.error("db-sync seller upsert parse failed:", err?.message || err)
}
}
if (!items.length) return 0
let updated = 0
for (const item of items) {
try {
await prisma.seller.upsert({
where: { id: item.sellerId },
create: { id: item.sellerId, ...item.data },
update: item.data,
})
updated += 1
} catch (err) {
console.error("db-sync seller upsert failed:", err?.message || err)
}
}
return updated
}
async function consumeSellerDomainUpserts(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
SELLER_DOMAIN_UPSERT_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const items = []
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
if (!parsed?.sellerId || !parsed?.domain || !parsed?.createdById) continue
items.push({
sellerId: Number(parsed.sellerId),
domain: String(parsed.domain).toLowerCase(),
createdById: Number(parsed.createdById),
})
} catch (err) {
console.error("db-sync seller domain parse failed:", err?.message || err)
}
}
if (!items.length) return 0
let created = 0
for (const item of items) {
try {
await prisma.sellerDomain.upsert({
where: { domain: item.domain },
create: {
domain: item.domain,
sellerId: item.sellerId,
createdById: item.createdById,
},
update: { sellerId: item.sellerId },
})
created += 1
} catch (err) {
console.error("db-sync seller domain upsert failed:", err?.message || err)
}
}
return created
}
async function consumeVoteUpdates(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
VOTE_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const dedup = new Map()
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
const key = `${parsed.dealId}:${parsed.userId}`
const createdAt = parsed.createdAt ? new Date(parsed.createdAt) : new Date()
const existing = dedup.get(key)
if (!existing || createdAt > existing.createdAt) {
dedup.set(key, {
dealId: Number(parsed.dealId),
userId: Number(parsed.userId),
voteType: Number(parsed.voteType),
createdAt,
})
}
} catch (err) {
console.error("db-sync vote parse/update failed:", err?.message || err)
}
}
const batch = Array.from(dedup.values())
if (!batch.length) return 0
const result = await voteDb.voteDealBatchTx(batch)
return result?.count ?? batch.length
}
async function consumeUserInterestIncrements(redis) {
const hashKeys = getUserInterestIncrementHashKeys()
if (!hashKeys.length) return 0
const increments = []
for (const hashKey of hashKeys) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
hashKey
)
if (!data || data.length === 0) continue
for (let i = 0; i < data.length; i += 2) {
try {
const field = String(data[i] || "")
const points = Number(data[i + 1] || 0)
if (!field || !Number.isFinite(points) || points <= 0) continue
const [userIdRaw, categoryIdRaw] = field.split(":")
const userId = Number(userIdRaw)
const categoryId = Number(categoryIdRaw)
if (!Number.isInteger(userId) || userId <= 0) continue
if (!Number.isInteger(categoryId) || categoryId <= 0) continue
increments.push({
userId,
categoryId,
points: Math.floor(points),
})
} catch (err) {
console.error("db-sync userInterest parse failed:", err?.message || err)
}
}
}
if (!increments.length) return 0
let updated = 0
for (let i = 0; i < increments.length; i += USER_INTEREST_DB_APPLY_BATCH_SIZE) {
const chunk = increments.slice(i, i + USER_INTEREST_DB_APPLY_BATCH_SIZE)
try {
const result = await userInterestProfileDb.applyInterestIncrementsBatch(chunk)
updated += Number(result?.updated || 0)
} catch (err) {
console.error("db-sync userInterest batch failed:", err?.message || err)
}
}
return updated
}
async function consumeCommentLikeUpdates(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
COMMENT_LIKE_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const dedup = new Map()
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
const key = `${parsed.commentId}:${parsed.userId}`
const createdAt = parsed.createdAt ? new Date(parsed.createdAt) : new Date()
const existing = dedup.get(key)
if (!existing || createdAt > existing.createdAt) {
dedup.set(key, {
commentId: Number(parsed.commentId),
userId: Number(parsed.userId),
like: Boolean(parsed.like),
createdAt,
})
}
} catch (err) {
console.error("db-sync commentLike parse failed:", err?.message || err)
}
}
const batch = Array.from(dedup.values())
if (!batch.length) return 0
try {
const result = await commentLikeDb.applyCommentLikeBatch(batch)
return (result?.inserted || 0) + (result?.deleted || 0)
} catch (err) {
console.error("db-sync commentLike batch failed:", err?.message || err)
return 0
}
}
async function consumeCommentCreates(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
COMMENT_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const items = []
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
items.push({
id: Number(parsed.commentId),
dealId: Number(parsed.dealId),
userId: Number(parsed.userId),
text: String(parsed.text || ""),
parentId: parsed.parentId ? Number(parsed.parentId) : null,
createdAt: parsed.createdAt ? new Date(parsed.createdAt) : new Date(),
})
} catch (err) {
console.error("db-sync comment parse failed:", err?.message || err)
}
}
if (!items.length) return 0
return prisma.$transaction(async (tx) => {
let created = 0
for (const item of items) {
try {
await tx.comment.create({
data: {
id: item.id,
dealId: item.dealId,
userId: item.userId,
text: item.text,
parentId: item.parentId,
createdAt: item.createdAt,
},
})
await tx.deal.update({
where: { id: item.dealId },
data: { commentCount: { increment: 1 } },
})
created += 1
} catch (err) {
// ignore duplicates
}
}
return created
})
}
async function consumeCommentDeletes(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
COMMENT_DELETE_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const items = []
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
items.push({
commentId: Number(parsed.commentId),
dealId: Number(parsed.dealId),
})
} catch (err) {
console.error("db-sync comment delete parse failed:", err?.message || err)
}
}
if (!items.length) return 0
return prisma.$transaction(async (tx) => {
let deleted = 0
for (const item of items) {
const result = await tx.comment.updateMany({
where: { id: item.commentId, deletedAt: null },
data: { deletedAt: new Date() },
})
if (result.count > 0) {
await tx.deal.update({
where: { id: item.dealId },
data: { commentCount: { decrement: 1 } },
})
deleted += 1
}
}
return deleted
})
}
async function consumeDealAnalyticsTotals(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
DEAL_ANALYTICS_TOTAL_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const increments = []
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
if (!parsed?.dealId) continue
increments.push({
dealId: Number(parsed.dealId),
impressions: Number(parsed.impressions || 0),
views: Number(parsed.views || 0),
clicks: Number(parsed.clicks || 0),
})
} catch (err) {
console.error("db-sync dealAnalyticsTotals parse failed:", err?.message || err)
}
}
if (!increments.length) return 0
try {
const result = await dealAnalyticsDb.applyDealTotalsBatch(increments)
return result?.updated ?? 0
} catch (err) {
console.error("db-sync dealAnalyticsTotals batch failed:", err?.message || err)
return 0
}
}
function normalizeDealCreateData(data = {}) {
return {
id: Number(data.id),
title: String(data.title || ""),
description: data.description ?? null,
url: data.url ?? null,
price: data.price ?? null,
originalPrice: data.originalPrice ?? null,
shippingPrice: data.shippingPrice ?? null,
percentOff: data.percentOff ?? null,
couponCode: data.couponCode ?? null,
location: data.location ?? null,
discountType: data.discountType ?? null,
discountValue: data.discountValue ?? null,
maxNotifiedMilestone: Number.isFinite(Number(data.maxNotifiedMilestone))
? Number(data.maxNotifiedMilestone)
: 0,
userId: Number(data.userId),
status: String(data.status || "PENDING"),
saletype: String(data.saletype || "ONLINE"),
affiliateType: String(data.affiliateType || "NON_AFFILIATE"),
sellerId: data.sellerId ? Number(data.sellerId) : null,
customSeller: data.customSeller ?? null,
categoryId: Number.isInteger(Number(data.categoryId)) ? Number(data.categoryId) : 0,
createdAt: data.createdAt ? new Date(data.createdAt) : new Date(),
updatedAt: data.updatedAt ? new Date(data.updatedAt) : new Date(),
}
}
async function consumeDealCreates(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
DEAL_CREATE_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const items = []
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
if (!parsed?.dealId || !parsed?.data) continue
items.push({
dealId: Number(parsed.dealId),
data: normalizeDealCreateData(parsed.data),
images: Array.isArray(parsed.images) ? parsed.images : [],
})
} catch (err) {
console.error("db-sync deal create parse failed:", err?.message || err)
}
}
if (!items.length) return 0
let created = 0
for (const item of items) {
try {
const data = { ...item.data, id: item.dealId }
await prisma.deal.create({ data })
await dealAnalyticsDb.ensureTotalsForDealIds([item.dealId])
if (item.images.length) {
const imagesData = item.images.map((img) => ({
dealId: item.dealId,
imageUrl: String(img.imageUrl || ""),
order: Number(img.order || 0),
}))
await prisma.dealImage.createMany({ data: imagesData })
}
created += 1
} catch (err) {
console.error("db-sync deal create failed:", err?.message || err)
}
}
if (created > 0) {
await prisma.$executeRawUnsafe(
'SELECT setval(pg_get_serial_sequence(\'"Deal"\', \'id\'), (SELECT MAX(id) FROM "Deal"))'
)
}
return created
}
async function consumeDealAiReviewUpdates(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
DEAL_AI_REVIEW_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const dedup = new Map()
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
const dealId = Number(parsed.dealId)
if (!dealId || !parsed?.data) continue
const updatedAt = parsed.updatedAt ? new Date(parsed.updatedAt) : new Date()
const existing = dedup.get(dealId)
if (!existing || updatedAt > existing.updatedAt) {
dedup.set(dealId, {
dealId,
data: parsed.data,
updatedAt,
})
}
} catch (err) {
console.error("db-sync deal aiReview parse failed:", err?.message || err)
}
}
const batch = Array.from(dedup.values())
if (!batch.length) return 0
let updated = 0
for (const item of batch) {
try {
await prisma.dealAiReview.upsert({
where: { dealId: item.dealId },
create: {
dealId: item.dealId,
bestCategoryId: Number(item.data.bestCategoryId) || 0,
tags: Array.isArray(item.data.tags) ? item.data.tags : [],
needsReview: Boolean(item.data.needsReview),
hasIssue: Boolean(item.data.hasIssue),
issueType: String(item.data.issueType || "NONE"),
issueReason: item.data.issueReason ?? null,
},
update: {
bestCategoryId: Number(item.data.bestCategoryId) || 0,
tags: Array.isArray(item.data.tags) ? item.data.tags : [],
needsReview: Boolean(item.data.needsReview),
hasIssue: Boolean(item.data.hasIssue),
issueType: String(item.data.issueType || "NONE"),
issueReason: item.data.issueReason ?? null,
},
})
updated += 1
} catch (err) {
console.error("db-sync deal aiReview update failed:", err?.message || err)
}
}
return updated
}
const DEAL_UPDATE_FIELDS = new Set([
"title",
"description",
"url",
"price",
"originalPrice",
"shippingPrice",
"couponCode",
"location",
"discountType",
"discountValue",
"maxNotifiedMilestone",
"status",
"saletype",
"affiliateType",
"sellerId",
"customSeller",
"categoryId",
"userId",
])
function sanitizeDealUpdate(data) {
const patch = {}
if (!data || typeof data !== "object") return patch
for (const [key, value] of Object.entries(data)) {
if (!DEAL_UPDATE_FIELDS.has(key)) continue
patch[key] = value
}
return patch
}
async function consumeDealUpdates(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
DEAL_UPDATE_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const dedup = new Map()
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
const id = Number(parsed.dealId)
if (!id) continue
const updatedAt = parsed.updatedAt ? new Date(parsed.updatedAt) : new Date()
const existing = dedup.get(id)
if (!existing || updatedAt > existing.updatedAt) {
dedup.set(id, {
dealId: id,
data: sanitizeDealUpdate(parsed.data),
updatedAt,
})
}
} catch (err) {
console.error("db-sync deal update parse failed:", err?.message || err)
}
}
const batch = Array.from(dedup.values()).filter((item) => Object.keys(item.data).length)
if (!batch.length) return 0
let updated = 0
for (const item of batch) {
try {
await prisma.deal.update({
where: { id: item.dealId },
data: { ...item.data, updatedAt: item.updatedAt },
})
updated += 1
} catch (err) {
console.error("db-sync deal update failed:", err?.message || err)
}
}
return updated
}
async function consumeNotifications(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
NOTIFICATION_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const items = []
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
if (!parsed?.userId || !parsed?.message) continue
items.push({
userId: Number(parsed.userId),
message: String(parsed.message),
type: String(parsed.type || "INFO"),
extras: normalizeJsonValue(parsed.extras),
createdAt: parsed.createdAt ? new Date(parsed.createdAt) : new Date(),
})
} catch (err) {
console.error("db-sync notification parse failed:", err?.message || err)
}
}
if (!items.length) return 0
let created = 0
for (const item of items) {
try {
await prisma.$transaction(async (tx) => {
await tx.notification.create({
data: {
userId: item.userId,
message: item.message,
type: item.type,
extras: item.extras,
createdAt: item.createdAt,
},
})
await tx.user.update({
where: { id: item.userId },
data: { notificationCount: { increment: 1 } },
})
})
created += 1
} catch (err) {
console.error("db-sync notification create failed:", err?.message || err)
}
}
return created
}
async function consumeNotificationReads(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
NOTIFICATION_READ_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const dedup = new Map()
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
if (!parsed?.userId) continue
const readAt = parsed.readAt ? new Date(parsed.readAt) : new Date()
const existing = dedup.get(parsed.userId)
if (!existing || readAt > existing.readAt) {
dedup.set(parsed.userId, { userId: Number(parsed.userId), readAt })
}
} catch (err) {
console.error("db-sync notification read parse failed:", err?.message || err)
}
}
const items = Array.from(dedup.values())
if (!items.length) return 0
let updated = 0
for (const item of items) {
try {
const result = await prisma.notification.updateMany({
where: {
userId: item.userId,
readAt: null,
createdAt: { lte: item.readAt },
},
data: { readAt: item.readAt },
})
updated += result?.count || 0
} catch (err) {
console.error("db-sync notification read update failed:", err?.message || err)
}
}
return updated
}
async function consumeDealSaveUpdates(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
DEAL_SAVE_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const dedup = new Map()
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
const key = `${parsed.dealId}:${parsed.userId}`
const createdAt = parsed.createdAt ? new Date(parsed.createdAt) : new Date()
const existing = dedup.get(key)
if (!existing || createdAt > existing.createdAt) {
dedup.set(key, {
dealId: Number(parsed.dealId),
userId: Number(parsed.userId),
action: String(parsed.action || "SAVE").toUpperCase(),
createdAt,
})
}
} catch (err) {
console.error("db-sync dealSave parse failed:", err?.message || err)
}
}
const items = Array.from(dedup.values())
if (!items.length) return 0
const saves = items.filter((i) => i.action === "SAVE")
const unsaves = items.filter((i) => i.action === "UNSAVE")
try {
await prisma.$transaction(async (tx) => {
if (saves.length) {
await tx.dealSave.createMany({
data: saves.map((i) => ({
dealId: i.dealId,
userId: i.userId,
createdAt: i.createdAt,
})),
skipDuplicates: true,
})
}
if (unsaves.length) {
await tx.dealSave.deleteMany({
where: {
OR: unsaves.map((i) => ({
dealId: i.dealId,
userId: i.userId,
})),
},
})
}
})
} catch (err) {
console.error("db-sync dealSave batch failed:", err?.message || err)
return 0
}
return items.length
}
async function consumeAuditEvents(redis) {
const data = await redis.eval(
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
1,
AUDIT_HASH_KEY
)
if (!data || data.length === 0) return 0
const pairs = {}
for (let i = 0; i < data.length; i += 2) {
pairs[data[i]] = data[i + 1]
}
const items = []
for (const payload of Object.values(pairs)) {
try {
const parsed = JSON.parse(payload)
if (!parsed?.action) continue
items.push({
userId: parsed.userId ? Number(parsed.userId) : null,
action: String(parsed.action),
ip: parsed.ip ?? null,
userAgent: parsed.userAgent ?? null,
meta: parsed.meta ?? null,
createdAt: parsed.createdAt ? new Date(parsed.createdAt) : new Date(),
})
} catch (err) {
console.error("db-sync audit parse failed:", err?.message || err)
}
}
if (!items.length) return 0
try {
await prisma.auditEvent.createMany({ data: items })
} catch (err) {
console.error("db-sync audit batch failed:", err?.message || err)
return 0
}
return items.length
}
async function handler() {
const redis = createRedisClient()
try {
const commentCreateCount = await consumeCommentCreates(redis)
const commentLikeCount = await consumeCommentLikeUpdates(redis)
const commentDeleteCount = await consumeCommentDeletes(redis)
const dealSaveCount = await consumeDealSaveUpdates(redis)
const dealEventCount = await consumeDealAnalyticsTotals(redis)
const dealCreateCount = await consumeDealCreates(redis)
const dealAiReviewCount = await consumeDealAiReviewUpdates(redis)
const notificationReadCount = await consumeNotificationReads(redis)
const notificationCount = await consumeNotifications(redis)
const dealUpdateCount = await consumeDealUpdates(redis)
const voteCount = await consumeVoteUpdates(redis)
const auditCount = await consumeAuditEvents(redis)
const userUpdateCount = await consumeUserUpdates(redis)
const userNoteCount = await consumeUserNotes(redis)
const dealReportUpdateCount = await consumeDealReportUpdates(redis)
const categoryUpsertCount = await consumeCategoryUpserts(redis)
const sellerUpsertCount = await consumeSellerUpserts(redis)
const sellerDomainUpsertCount = await consumeSellerDomainUpserts(redis)
const userInterestCount = await consumeUserInterestIncrements(redis)
return {
votes: voteCount,
commentLikes: commentLikeCount,
commentsCreated: commentCreateCount,
commentsDeleted: commentDeleteCount,
dealSaves: dealSaveCount,
dealEvents: dealEventCount,
dealCreates: dealCreateCount,
dealAiReviews: dealAiReviewCount,
notificationsRead: notificationReadCount,
notifications: notificationCount,
dealUpdates: dealUpdateCount,
audits: auditCount,
userUpdates: userUpdateCount,
userNotes: userNoteCount,
dealReportUpdates: dealReportUpdateCount,
categoryUpserts: categoryUpsertCount,
sellerUpserts: sellerUpsertCount,
sellerDomainUpserts: sellerDomainUpsertCount,
userInterests: userInterestCount,
}
} finally {
redis.disconnect()
}
}
function startDbSyncWorker() {
const worker = new Worker("db-sync", handler, {
connection: getRedisConnectionOptions(),
concurrency: 1,
})
worker.on("completed", (job) => {
console.log(
`DB sync batch done. Votes: ${job.returnvalue?.votes ?? 0} CommentLikes: ${job.returnvalue?.commentLikes ?? 0} CommentsCreated: ${job.returnvalue?.commentsCreated ?? 0} CommentsDeleted: ${job.returnvalue?.commentsDeleted ?? 0} DealSaves: ${job.returnvalue?.dealSaves ?? 0} DealEvents: ${job.returnvalue?.dealEvents ?? 0} DealCreates: ${job.returnvalue?.dealCreates ?? 0} DealAiReviews: ${job.returnvalue?.dealAiReviews ?? 0} NotificationsRead: ${job.returnvalue?.notificationsRead ?? 0} Notifications: ${job.returnvalue?.notifications ?? 0} DealUpdates: ${job.returnvalue?.dealUpdates ?? 0} Audits: ${job.returnvalue?.audits ?? 0} UserUpdates: ${job.returnvalue?.userUpdates ?? 0} UserNotes: ${job.returnvalue?.userNotes ?? 0} DealReportUpdates: ${job.returnvalue?.dealReportUpdates ?? 0} CategoryUpserts: ${job.returnvalue?.categoryUpserts ?? 0} SellerUpserts: ${job.returnvalue?.sellerUpserts ?? 0} SellerDomainUpserts: ${job.returnvalue?.sellerDomainUpserts ?? 0} UserInterests: ${job.returnvalue?.userInterests ?? 0}`
)
})
worker.on("failed", (job, err) => {
console.error(`❌ DB sync batch failed: ${job?.id}`, err?.message)
})
return worker
}
module.exports = { startDbSyncWorker }