1065 lines
30 KiB
JavaScript
1065 lines
30 KiB
JavaScript
const { Worker } = require("bullmq")
|
|
const Redis = require("ioredis")
|
|
const { getRedisConnectionOptions } = require("../services/redis/connection")
|
|
const voteDb = require("../db/vote.db")
|
|
const {
|
|
VOTE_HASH_KEY,
|
|
COMMENT_LIKE_HASH_KEY,
|
|
COMMENT_HASH_KEY,
|
|
COMMENT_DELETE_HASH_KEY,
|
|
DEAL_UPDATE_HASH_KEY,
|
|
DEAL_CREATE_HASH_KEY,
|
|
DEAL_AI_REVIEW_HASH_KEY,
|
|
NOTIFICATION_HASH_KEY,
|
|
NOTIFICATION_READ_HASH_KEY,
|
|
DEAL_SAVE_HASH_KEY,
|
|
AUDIT_HASH_KEY,
|
|
USER_UPDATE_HASH_KEY,
|
|
USER_NOTE_HASH_KEY,
|
|
DEAL_REPORT_UPDATE_HASH_KEY,
|
|
CATEGORY_UPSERT_HASH_KEY,
|
|
SELLER_UPSERT_HASH_KEY,
|
|
SELLER_DOMAIN_UPSERT_HASH_KEY,
|
|
} = require("../services/redis/dbSync.service")
|
|
const {
|
|
DEAL_EVENT_HASH_KEY,
|
|
incrementDealAnalyticsTotalsInRedis,
|
|
} = require("../services/redis/dealAnalytics.service")
|
|
const commentLikeDb = require("../db/commentLike.db")
|
|
const dealAnalyticsDb = require("../db/dealAnalytics.db")
|
|
const prisma = require("../db/client")
|
|
|
|
function createRedisClient() {
|
|
return new Redis(getRedisConnectionOptions())
|
|
}
|
|
|
|
async function consumeUserUpdates(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
USER_UPDATE_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const dedup = new Map()
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
const id = Number(parsed.userId)
|
|
if (!id || !parsed?.data) continue
|
|
const updatedAt = parsed.updatedAt ? new Date(parsed.updatedAt) : new Date()
|
|
const existing = dedup.get(id)
|
|
if (!existing || updatedAt > existing.updatedAt) {
|
|
dedup.set(id, { userId: id, data: parsed.data, updatedAt })
|
|
}
|
|
} catch (err) {
|
|
console.error("db-sync user update parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
const items = Array.from(dedup.values())
|
|
if (!items.length) return 0
|
|
|
|
let updated = 0
|
|
for (const item of items) {
|
|
try {
|
|
await prisma.user.update({
|
|
where: { id: item.userId },
|
|
data: { ...item.data, updatedAt: item.updatedAt },
|
|
})
|
|
updated += 1
|
|
} catch (err) {
|
|
console.error("db-sync user update failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
return updated
|
|
}
|
|
|
|
async function consumeUserNotes(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
USER_NOTE_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const items = []
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
if (!parsed?.userId || !parsed?.createdById || !parsed?.note) continue
|
|
items.push({
|
|
userId: Number(parsed.userId),
|
|
createdById: Number(parsed.createdById),
|
|
note: String(parsed.note),
|
|
createdAt: parsed.createdAt ? new Date(parsed.createdAt) : new Date(),
|
|
})
|
|
} catch (err) {
|
|
console.error("db-sync user note parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
if (!items.length) return 0
|
|
try {
|
|
await prisma.userNote.createMany({ data: items })
|
|
return items.length
|
|
} catch (err) {
|
|
console.error("db-sync user note batch failed:", err?.message || err)
|
|
return 0
|
|
}
|
|
}
|
|
|
|
async function consumeDealReportUpdates(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
DEAL_REPORT_UPDATE_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const items = []
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
if (!parsed?.reportId || !parsed?.status) continue
|
|
items.push({
|
|
reportId: Number(parsed.reportId),
|
|
status: String(parsed.status),
|
|
updatedAt: parsed.updatedAt ? new Date(parsed.updatedAt) : new Date(),
|
|
})
|
|
} catch (err) {
|
|
console.error("db-sync dealReport update parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
if (!items.length) return 0
|
|
|
|
let updated = 0
|
|
for (const item of items) {
|
|
try {
|
|
await prisma.dealReport.update({
|
|
where: { id: item.reportId },
|
|
data: { status: item.status, updatedAt: item.updatedAt },
|
|
})
|
|
updated += 1
|
|
} catch (err) {
|
|
console.error("db-sync dealReport update failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
return updated
|
|
}
|
|
|
|
async function consumeCategoryUpserts(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
CATEGORY_UPSERT_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const items = []
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
if (!parsed?.categoryId || !parsed?.data) continue
|
|
items.push({
|
|
categoryId: Number(parsed.categoryId),
|
|
data: parsed.data,
|
|
})
|
|
} catch (err) {
|
|
console.error("db-sync category upsert parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
if (!items.length) return 0
|
|
let updated = 0
|
|
for (const item of items) {
|
|
try {
|
|
await prisma.category.upsert({
|
|
where: { id: item.categoryId },
|
|
create: { id: item.categoryId, ...item.data },
|
|
update: item.data,
|
|
})
|
|
updated += 1
|
|
} catch (err) {
|
|
console.error("db-sync category upsert failed:", err?.message || err)
|
|
}
|
|
}
|
|
return updated
|
|
}
|
|
|
|
async function consumeSellerUpserts(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
SELLER_UPSERT_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const items = []
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
if (!parsed?.sellerId || !parsed?.data) continue
|
|
items.push({
|
|
sellerId: Number(parsed.sellerId),
|
|
data: parsed.data,
|
|
})
|
|
} catch (err) {
|
|
console.error("db-sync seller upsert parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
if (!items.length) return 0
|
|
let updated = 0
|
|
for (const item of items) {
|
|
try {
|
|
await prisma.seller.upsert({
|
|
where: { id: item.sellerId },
|
|
create: { id: item.sellerId, ...item.data },
|
|
update: item.data,
|
|
})
|
|
updated += 1
|
|
} catch (err) {
|
|
console.error("db-sync seller upsert failed:", err?.message || err)
|
|
}
|
|
}
|
|
return updated
|
|
}
|
|
|
|
async function consumeSellerDomainUpserts(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
SELLER_DOMAIN_UPSERT_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const items = []
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
if (!parsed?.sellerId || !parsed?.domain || !parsed?.createdById) continue
|
|
items.push({
|
|
sellerId: Number(parsed.sellerId),
|
|
domain: String(parsed.domain).toLowerCase(),
|
|
createdById: Number(parsed.createdById),
|
|
})
|
|
} catch (err) {
|
|
console.error("db-sync seller domain parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
if (!items.length) return 0
|
|
let created = 0
|
|
for (const item of items) {
|
|
try {
|
|
await prisma.sellerDomain.upsert({
|
|
where: { domain: item.domain },
|
|
create: {
|
|
domain: item.domain,
|
|
sellerId: item.sellerId,
|
|
createdById: item.createdById,
|
|
},
|
|
update: { sellerId: item.sellerId },
|
|
})
|
|
created += 1
|
|
} catch (err) {
|
|
console.error("db-sync seller domain upsert failed:", err?.message || err)
|
|
}
|
|
}
|
|
return created
|
|
}
|
|
|
|
async function consumeVoteUpdates(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
VOTE_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const dedup = new Map()
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
const key = `${parsed.dealId}:${parsed.userId}`
|
|
const createdAt = parsed.createdAt ? new Date(parsed.createdAt) : new Date()
|
|
const existing = dedup.get(key)
|
|
if (!existing || createdAt > existing.createdAt) {
|
|
dedup.set(key, {
|
|
dealId: Number(parsed.dealId),
|
|
userId: Number(parsed.userId),
|
|
voteType: Number(parsed.voteType),
|
|
createdAt,
|
|
})
|
|
}
|
|
} catch (err) {
|
|
console.error("db-sync vote parse/update failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
const batch = Array.from(dedup.values())
|
|
if (!batch.length) return 0
|
|
const result = await voteDb.voteDealBatchTx(batch)
|
|
return result?.count ?? batch.length
|
|
}
|
|
|
|
async function consumeCommentLikeUpdates(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
COMMENT_LIKE_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const dedup = new Map()
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
const key = `${parsed.commentId}:${parsed.userId}`
|
|
const createdAt = parsed.createdAt ? new Date(parsed.createdAt) : new Date()
|
|
const existing = dedup.get(key)
|
|
if (!existing || createdAt > existing.createdAt) {
|
|
dedup.set(key, {
|
|
commentId: Number(parsed.commentId),
|
|
userId: Number(parsed.userId),
|
|
like: Boolean(parsed.like),
|
|
createdAt,
|
|
})
|
|
}
|
|
} catch (err) {
|
|
console.error("db-sync commentLike parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
const batch = Array.from(dedup.values())
|
|
if (!batch.length) return 0
|
|
|
|
try {
|
|
const result = await commentLikeDb.applyCommentLikeBatch(batch)
|
|
return (result?.inserted || 0) + (result?.deleted || 0)
|
|
} catch (err) {
|
|
console.error("db-sync commentLike batch failed:", err?.message || err)
|
|
return 0
|
|
}
|
|
}
|
|
|
|
async function consumeCommentCreates(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
COMMENT_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const items = []
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
items.push({
|
|
id: Number(parsed.commentId),
|
|
dealId: Number(parsed.dealId),
|
|
userId: Number(parsed.userId),
|
|
text: String(parsed.text || ""),
|
|
parentId: parsed.parentId ? Number(parsed.parentId) : null,
|
|
createdAt: parsed.createdAt ? new Date(parsed.createdAt) : new Date(),
|
|
})
|
|
} catch (err) {
|
|
console.error("db-sync comment parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
if (!items.length) return 0
|
|
|
|
return prisma.$transaction(async (tx) => {
|
|
let created = 0
|
|
for (const item of items) {
|
|
try {
|
|
await tx.comment.create({
|
|
data: {
|
|
id: item.id,
|
|
dealId: item.dealId,
|
|
userId: item.userId,
|
|
text: item.text,
|
|
parentId: item.parentId,
|
|
createdAt: item.createdAt,
|
|
},
|
|
})
|
|
await tx.deal.update({
|
|
where: { id: item.dealId },
|
|
data: { commentCount: { increment: 1 } },
|
|
})
|
|
created += 1
|
|
} catch (err) {
|
|
// ignore duplicates
|
|
}
|
|
}
|
|
return created
|
|
})
|
|
}
|
|
|
|
async function consumeCommentDeletes(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
COMMENT_DELETE_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const items = []
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
items.push({
|
|
commentId: Number(parsed.commentId),
|
|
dealId: Number(parsed.dealId),
|
|
})
|
|
} catch (err) {
|
|
console.error("db-sync comment delete parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
if (!items.length) return 0
|
|
|
|
return prisma.$transaction(async (tx) => {
|
|
let deleted = 0
|
|
for (const item of items) {
|
|
const result = await tx.comment.updateMany({
|
|
where: { id: item.commentId, deletedAt: null },
|
|
data: { deletedAt: new Date() },
|
|
})
|
|
if (result.count > 0) {
|
|
await tx.deal.update({
|
|
where: { id: item.dealId },
|
|
data: { commentCount: { decrement: 1 } },
|
|
})
|
|
deleted += 1
|
|
}
|
|
}
|
|
return deleted
|
|
})
|
|
}
|
|
|
|
async function consumeDealEvents(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
DEAL_EVENT_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const events = []
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
if (!parsed?.dealId || (!parsed?.userId && !parsed?.ip)) continue
|
|
events.push({
|
|
dealId: Number(parsed.dealId),
|
|
type: String(parsed.type || "IMPRESSION").toUpperCase(),
|
|
userId: parsed.userId ? Number(parsed.userId) : null,
|
|
ip: parsed.ip ? String(parsed.ip) : null,
|
|
createdAt: parsed.createdAt,
|
|
})
|
|
} catch (err) {
|
|
console.error("db-sync dealEvent parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
if (!events.length) return 0
|
|
|
|
try {
|
|
const result = await dealAnalyticsDb.applyDealEventBatch(events)
|
|
await incrementDealAnalyticsTotalsInRedis(result?.increments || [])
|
|
return result?.inserted ?? 0
|
|
} catch (err) {
|
|
console.error("db-sync dealEvent batch failed:", err?.message || err)
|
|
return 0
|
|
}
|
|
}
|
|
|
|
function normalizeDealCreateData(data = {}) {
|
|
return {
|
|
id: Number(data.id),
|
|
title: String(data.title || ""),
|
|
description: data.description ?? null,
|
|
url: data.url ?? null,
|
|
price: data.price ?? null,
|
|
originalPrice: data.originalPrice ?? null,
|
|
shippingPrice: data.shippingPrice ?? null,
|
|
percentOff: data.percentOff ?? null,
|
|
couponCode: data.couponCode ?? null,
|
|
location: data.location ?? null,
|
|
discountType: data.discountType ?? null,
|
|
discountValue: data.discountValue ?? null,
|
|
maxNotifiedMilestone: Number.isFinite(Number(data.maxNotifiedMilestone))
|
|
? Number(data.maxNotifiedMilestone)
|
|
: 0,
|
|
userId: Number(data.userId),
|
|
status: String(data.status || "PENDING"),
|
|
saletype: String(data.saletype || "ONLINE"),
|
|
affiliateType: String(data.affiliateType || "NON_AFFILIATE"),
|
|
sellerId: data.sellerId ? Number(data.sellerId) : null,
|
|
customSeller: data.customSeller ?? null,
|
|
categoryId: Number.isInteger(Number(data.categoryId)) ? Number(data.categoryId) : 0,
|
|
createdAt: data.createdAt ? new Date(data.createdAt) : new Date(),
|
|
updatedAt: data.updatedAt ? new Date(data.updatedAt) : new Date(),
|
|
}
|
|
}
|
|
|
|
async function consumeDealCreates(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
DEAL_CREATE_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const items = []
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
if (!parsed?.dealId || !parsed?.data) continue
|
|
items.push({
|
|
dealId: Number(parsed.dealId),
|
|
data: normalizeDealCreateData(parsed.data),
|
|
images: Array.isArray(parsed.images) ? parsed.images : [],
|
|
})
|
|
} catch (err) {
|
|
console.error("db-sync deal create parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
if (!items.length) return 0
|
|
|
|
let created = 0
|
|
for (const item of items) {
|
|
try {
|
|
const data = { ...item.data, id: item.dealId }
|
|
await prisma.deal.create({ data })
|
|
await dealAnalyticsDb.ensureTotalsForDealIds([item.dealId])
|
|
if (item.images.length) {
|
|
const imagesData = item.images.map((img) => ({
|
|
dealId: item.dealId,
|
|
imageUrl: String(img.imageUrl || ""),
|
|
order: Number(img.order || 0),
|
|
}))
|
|
await prisma.dealImage.createMany({ data: imagesData })
|
|
}
|
|
created += 1
|
|
} catch (err) {
|
|
console.error("db-sync deal create failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
if (created > 0) {
|
|
await prisma.$executeRawUnsafe(
|
|
'SELECT setval(pg_get_serial_sequence(\'"Deal"\', \'id\'), (SELECT MAX(id) FROM "Deal"))'
|
|
)
|
|
}
|
|
|
|
return created
|
|
}
|
|
|
|
async function consumeDealAiReviewUpdates(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
DEAL_AI_REVIEW_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const dedup = new Map()
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
const dealId = Number(parsed.dealId)
|
|
if (!dealId || !parsed?.data) continue
|
|
const updatedAt = parsed.updatedAt ? new Date(parsed.updatedAt) : new Date()
|
|
const existing = dedup.get(dealId)
|
|
if (!existing || updatedAt > existing.updatedAt) {
|
|
dedup.set(dealId, {
|
|
dealId,
|
|
data: parsed.data,
|
|
updatedAt,
|
|
})
|
|
}
|
|
} catch (err) {
|
|
console.error("db-sync deal aiReview parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
const batch = Array.from(dedup.values())
|
|
if (!batch.length) return 0
|
|
|
|
let updated = 0
|
|
for (const item of batch) {
|
|
try {
|
|
await prisma.dealAiReview.upsert({
|
|
where: { dealId: item.dealId },
|
|
create: {
|
|
dealId: item.dealId,
|
|
bestCategoryId: Number(item.data.bestCategoryId) || 0,
|
|
tags: Array.isArray(item.data.tags) ? item.data.tags : [],
|
|
needsReview: Boolean(item.data.needsReview),
|
|
hasIssue: Boolean(item.data.hasIssue),
|
|
issueType: String(item.data.issueType || "NONE"),
|
|
issueReason: item.data.issueReason ?? null,
|
|
},
|
|
update: {
|
|
bestCategoryId: Number(item.data.bestCategoryId) || 0,
|
|
tags: Array.isArray(item.data.tags) ? item.data.tags : [],
|
|
needsReview: Boolean(item.data.needsReview),
|
|
hasIssue: Boolean(item.data.hasIssue),
|
|
issueType: String(item.data.issueType || "NONE"),
|
|
issueReason: item.data.issueReason ?? null,
|
|
},
|
|
})
|
|
updated += 1
|
|
} catch (err) {
|
|
console.error("db-sync deal aiReview update failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
return updated
|
|
}
|
|
|
|
const DEAL_UPDATE_FIELDS = new Set([
|
|
"title",
|
|
"description",
|
|
"url",
|
|
"price",
|
|
"originalPrice",
|
|
"shippingPrice",
|
|
"couponCode",
|
|
"location",
|
|
"discountType",
|
|
"discountValue",
|
|
"maxNotifiedMilestone",
|
|
"status",
|
|
"saletype",
|
|
"affiliateType",
|
|
"sellerId",
|
|
"customSeller",
|
|
"categoryId",
|
|
"userId",
|
|
])
|
|
|
|
function sanitizeDealUpdate(data) {
|
|
const patch = {}
|
|
if (!data || typeof data !== "object") return patch
|
|
for (const [key, value] of Object.entries(data)) {
|
|
if (!DEAL_UPDATE_FIELDS.has(key)) continue
|
|
patch[key] = value
|
|
}
|
|
return patch
|
|
}
|
|
|
|
async function consumeDealUpdates(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
DEAL_UPDATE_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const dedup = new Map()
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
const id = Number(parsed.dealId)
|
|
if (!id) continue
|
|
const updatedAt = parsed.updatedAt ? new Date(parsed.updatedAt) : new Date()
|
|
const existing = dedup.get(id)
|
|
if (!existing || updatedAt > existing.updatedAt) {
|
|
dedup.set(id, {
|
|
dealId: id,
|
|
data: sanitizeDealUpdate(parsed.data),
|
|
updatedAt,
|
|
})
|
|
}
|
|
} catch (err) {
|
|
console.error("db-sync deal update parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
const batch = Array.from(dedup.values()).filter((item) => Object.keys(item.data).length)
|
|
if (!batch.length) return 0
|
|
|
|
let updated = 0
|
|
for (const item of batch) {
|
|
try {
|
|
await prisma.deal.update({
|
|
where: { id: item.dealId },
|
|
data: { ...item.data, updatedAt: item.updatedAt },
|
|
})
|
|
updated += 1
|
|
} catch (err) {
|
|
console.error("db-sync deal update failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
return updated
|
|
}
|
|
|
|
async function consumeNotifications(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
NOTIFICATION_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const items = []
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
if (!parsed?.userId || !parsed?.message) continue
|
|
items.push({
|
|
userId: Number(parsed.userId),
|
|
message: String(parsed.message),
|
|
type: String(parsed.type || "INFO"),
|
|
createdAt: parsed.createdAt ? new Date(parsed.createdAt) : new Date(),
|
|
})
|
|
} catch (err) {
|
|
console.error("db-sync notification parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
if (!items.length) return 0
|
|
|
|
let created = 0
|
|
for (const item of items) {
|
|
try {
|
|
await prisma.$transaction(async (tx) => {
|
|
await tx.notification.create({
|
|
data: {
|
|
userId: item.userId,
|
|
message: item.message,
|
|
type: item.type,
|
|
createdAt: item.createdAt,
|
|
},
|
|
})
|
|
await tx.user.update({
|
|
where: { id: item.userId },
|
|
data: { notificationCount: { increment: 1 } },
|
|
})
|
|
})
|
|
created += 1
|
|
} catch (err) {
|
|
console.error("db-sync notification create failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
return created
|
|
}
|
|
|
|
async function consumeNotificationReads(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
NOTIFICATION_READ_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const dedup = new Map()
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
if (!parsed?.userId) continue
|
|
const readAt = parsed.readAt ? new Date(parsed.readAt) : new Date()
|
|
const existing = dedup.get(parsed.userId)
|
|
if (!existing || readAt > existing.readAt) {
|
|
dedup.set(parsed.userId, { userId: Number(parsed.userId), readAt })
|
|
}
|
|
} catch (err) {
|
|
console.error("db-sync notification read parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
const items = Array.from(dedup.values())
|
|
if (!items.length) return 0
|
|
|
|
let updated = 0
|
|
for (const item of items) {
|
|
try {
|
|
const result = await prisma.notification.updateMany({
|
|
where: {
|
|
userId: item.userId,
|
|
readAt: null,
|
|
createdAt: { lte: item.readAt },
|
|
},
|
|
data: { readAt: item.readAt },
|
|
})
|
|
updated += result?.count || 0
|
|
} catch (err) {
|
|
console.error("db-sync notification read update failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
return updated
|
|
}
|
|
|
|
async function consumeDealSaveUpdates(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
DEAL_SAVE_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const dedup = new Map()
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
const key = `${parsed.dealId}:${parsed.userId}`
|
|
const createdAt = parsed.createdAt ? new Date(parsed.createdAt) : new Date()
|
|
const existing = dedup.get(key)
|
|
if (!existing || createdAt > existing.createdAt) {
|
|
dedup.set(key, {
|
|
dealId: Number(parsed.dealId),
|
|
userId: Number(parsed.userId),
|
|
action: String(parsed.action || "SAVE").toUpperCase(),
|
|
createdAt,
|
|
})
|
|
}
|
|
} catch (err) {
|
|
console.error("db-sync dealSave parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
const items = Array.from(dedup.values())
|
|
if (!items.length) return 0
|
|
|
|
const saves = items.filter((i) => i.action === "SAVE")
|
|
const unsaves = items.filter((i) => i.action === "UNSAVE")
|
|
|
|
try {
|
|
await prisma.$transaction(async (tx) => {
|
|
if (saves.length) {
|
|
await tx.dealSave.createMany({
|
|
data: saves.map((i) => ({
|
|
dealId: i.dealId,
|
|
userId: i.userId,
|
|
createdAt: i.createdAt,
|
|
})),
|
|
skipDuplicates: true,
|
|
})
|
|
}
|
|
if (unsaves.length) {
|
|
await tx.dealSave.deleteMany({
|
|
where: {
|
|
OR: unsaves.map((i) => ({
|
|
dealId: i.dealId,
|
|
userId: i.userId,
|
|
})),
|
|
},
|
|
})
|
|
}
|
|
})
|
|
} catch (err) {
|
|
console.error("db-sync dealSave batch failed:", err?.message || err)
|
|
return 0
|
|
}
|
|
|
|
return items.length
|
|
}
|
|
|
|
async function consumeAuditEvents(redis) {
|
|
const data = await redis.eval(
|
|
"local data = redis.call('HGETALL', KEYS[1]); redis.call('DEL', KEYS[1]); return data;",
|
|
1,
|
|
AUDIT_HASH_KEY
|
|
)
|
|
if (!data || data.length === 0) return 0
|
|
|
|
const pairs = {}
|
|
for (let i = 0; i < data.length; i += 2) {
|
|
pairs[data[i]] = data[i + 1]
|
|
}
|
|
|
|
const items = []
|
|
for (const payload of Object.values(pairs)) {
|
|
try {
|
|
const parsed = JSON.parse(payload)
|
|
if (!parsed?.action) continue
|
|
items.push({
|
|
userId: parsed.userId ? Number(parsed.userId) : null,
|
|
action: String(parsed.action),
|
|
ip: parsed.ip ?? null,
|
|
userAgent: parsed.userAgent ?? null,
|
|
meta: parsed.meta ?? null,
|
|
createdAt: parsed.createdAt ? new Date(parsed.createdAt) : new Date(),
|
|
})
|
|
} catch (err) {
|
|
console.error("db-sync audit parse failed:", err?.message || err)
|
|
}
|
|
}
|
|
|
|
if (!items.length) return 0
|
|
|
|
try {
|
|
await prisma.auditEvent.createMany({ data: items })
|
|
} catch (err) {
|
|
console.error("db-sync audit batch failed:", err?.message || err)
|
|
return 0
|
|
}
|
|
|
|
return items.length
|
|
}
|
|
|
|
async function handler() {
|
|
const redis = createRedisClient()
|
|
|
|
try {
|
|
const commentCreateCount = await consumeCommentCreates(redis)
|
|
const commentLikeCount = await consumeCommentLikeUpdates(redis)
|
|
const commentDeleteCount = await consumeCommentDeletes(redis)
|
|
const dealSaveCount = await consumeDealSaveUpdates(redis)
|
|
const dealEventCount = await consumeDealEvents(redis)
|
|
const dealCreateCount = await consumeDealCreates(redis)
|
|
const dealAiReviewCount = await consumeDealAiReviewUpdates(redis)
|
|
const notificationReadCount = await consumeNotificationReads(redis)
|
|
const notificationCount = await consumeNotifications(redis)
|
|
const dealUpdateCount = await consumeDealUpdates(redis)
|
|
const voteCount = await consumeVoteUpdates(redis)
|
|
const auditCount = await consumeAuditEvents(redis)
|
|
const userUpdateCount = await consumeUserUpdates(redis)
|
|
const userNoteCount = await consumeUserNotes(redis)
|
|
const dealReportUpdateCount = await consumeDealReportUpdates(redis)
|
|
const categoryUpsertCount = await consumeCategoryUpserts(redis)
|
|
const sellerUpsertCount = await consumeSellerUpserts(redis)
|
|
const sellerDomainUpsertCount = await consumeSellerDomainUpserts(redis)
|
|
return {
|
|
votes: voteCount,
|
|
commentLikes: commentLikeCount,
|
|
commentsCreated: commentCreateCount,
|
|
commentsDeleted: commentDeleteCount,
|
|
dealSaves: dealSaveCount,
|
|
dealEvents: dealEventCount,
|
|
dealCreates: dealCreateCount,
|
|
dealAiReviews: dealAiReviewCount,
|
|
notificationsRead: notificationReadCount,
|
|
notifications: notificationCount,
|
|
dealUpdates: dealUpdateCount,
|
|
audits: auditCount,
|
|
userUpdates: userUpdateCount,
|
|
userNotes: userNoteCount,
|
|
dealReportUpdates: dealReportUpdateCount,
|
|
categoryUpserts: categoryUpsertCount,
|
|
sellerUpserts: sellerUpsertCount,
|
|
sellerDomainUpserts: sellerDomainUpsertCount,
|
|
}
|
|
} finally {
|
|
redis.disconnect()
|
|
}
|
|
}
|
|
|
|
function startDbSyncWorker() {
|
|
const worker = new Worker("db-sync", handler, {
|
|
connection: getRedisConnectionOptions(),
|
|
concurrency: 1,
|
|
})
|
|
|
|
worker.on("completed", (job) => {
|
|
console.log(
|
|
`✅ DB sync batch done. Votes: ${job.returnvalue?.votes ?? 0} CommentLikes: ${job.returnvalue?.commentLikes ?? 0} CommentsCreated: ${job.returnvalue?.commentsCreated ?? 0} CommentsDeleted: ${job.returnvalue?.commentsDeleted ?? 0} DealSaves: ${job.returnvalue?.dealSaves ?? 0} DealEvents: ${job.returnvalue?.dealEvents ?? 0} DealCreates: ${job.returnvalue?.dealCreates ?? 0} DealAiReviews: ${job.returnvalue?.dealAiReviews ?? 0} NotificationsRead: ${job.returnvalue?.notificationsRead ?? 0} Notifications: ${job.returnvalue?.notifications ?? 0} DealUpdates: ${job.returnvalue?.dealUpdates ?? 0} Audits: ${job.returnvalue?.audits ?? 0} UserUpdates: ${job.returnvalue?.userUpdates ?? 0} UserNotes: ${job.returnvalue?.userNotes ?? 0} DealReportUpdates: ${job.returnvalue?.dealReportUpdates ?? 0} CategoryUpserts: ${job.returnvalue?.categoryUpserts ?? 0} SellerUpserts: ${job.returnvalue?.sellerUpserts ?? 0} SellerDomainUpserts: ${job.returnvalue?.sellerDomainUpserts ?? 0}`
|
|
)
|
|
})
|
|
|
|
worker.on("failed", (job, err) => {
|
|
console.error(`❌ DB sync batch failed: ${job?.id}`, err?.message)
|
|
})
|
|
|
|
return worker
|
|
}
|
|
|
|
module.exports = { startDbSyncWorker }
|