const { getRedisClient } = require("./client") const prisma = require("../../db/client") const voteDb = require("../../db/vote.db") const commentLikeDb = require("../../db/commentLike.db") const dealAnalyticsDb = require("../../db/dealAnalytics.db") const dealSaveDb = require("../../db/dealSave.db") const VOTE_HASH_KEY = "bull:dbsync:votes" const COMMENT_LIKE_HASH_KEY = "bull:dbsync:commentLikes" const COMMENT_HASH_KEY = "bull:dbsync:comments" const COMMENT_DELETE_HASH_KEY = "bull:dbsync:commentDeletes" const DEAL_UPDATE_HASH_KEY = "bull:dbsync:dealUpdates" const DEAL_CREATE_HASH_KEY = "bull:dbsync:dealCreates" const DEAL_AI_REVIEW_HASH_KEY = "bull:dbsync:dealAiReviews" const NOTIFICATION_HASH_KEY = "bull:dbsync:notifications" const NOTIFICATION_READ_HASH_KEY = "bull:dbsync:notificationReads" const DEAL_SAVE_HASH_KEY = "bull:dbsync:dealSaves" const AUDIT_HASH_KEY = "bull:dbsync:audits" const USER_UPDATE_HASH_KEY = "bull:dbsync:users" const USER_NOTE_HASH_KEY = "bull:dbsync:userNotes" const DEAL_REPORT_UPDATE_HASH_KEY = "bull:dbsync:dealReportUpdates" const CATEGORY_UPSERT_HASH_KEY = "bull:dbsync:categoryUpserts" const SELLER_UPSERT_HASH_KEY = "bull:dbsync:sellerUpserts" const SELLER_DOMAIN_UPSERT_HASH_KEY = "bull:dbsync:sellerDomainUpserts" const DEAL_ANALYTICS_TOTAL_HASH_KEY = "bull:dbsync:dealAnalyticsTotals" function createRedisClient() { return getRedisClient() } async function tryQueue({ redisAction, fallbackAction, label }) { try { await redisAction() return { queued: true } } catch (err) { if (fallbackAction) { try { await fallbackAction() return { queued: false, fallback: true } } catch (fallbackErr) { console.error(`[dbsync-fallback] ${label || "unknown"} failed:`, fallbackErr?.message || fallbackErr) } } console.error(`[dbsync-queue] ${label || "unknown"} failed:`, err?.message || err) return { queued: false, fallback: false } } } const DEAL_UPDATE_FIELDS = new Set([ "title", "description", "url", "price", "originalPrice", "shippingPrice", "couponCode", "location", "discountType", "discountValue", "barcodeId", "maxNotifiedMilestone", "status", "saletype", "affiliateType", "sellerId", "customSeller", "categoryId", "userId", ]) function sanitizeDealUpdate(data) { const patch = {} if (!data || typeof data !== "object") return patch for (const [key, value] of Object.entries(data)) { if (!DEAL_UPDATE_FIELDS.has(key)) continue patch[key] = value } return patch } function normalizeDealCreateData(data = {}) { return { id: Number(data.id), title: String(data.title || ""), description: data.description ?? null, url: data.url ?? null, price: data.price ?? null, originalPrice: data.originalPrice ?? null, shippingPrice: data.shippingPrice ?? null, percentOff: data.percentOff ?? null, couponCode: data.couponCode ?? null, location: data.location ?? null, discountType: data.discountType ?? null, discountValue: data.discountValue ?? null, barcodeId: data.barcodeId ?? null, maxNotifiedMilestone: Number.isFinite(Number(data.maxNotifiedMilestone)) ? Number(data.maxNotifiedMilestone) : 0, userId: Number(data.userId), status: String(data.status || "PENDING"), saletype: String(data.saletype || "ONLINE"), affiliateType: String(data.affiliateType || "NON_AFFILIATE"), sellerId: data.sellerId ? Number(data.sellerId) : null, customSeller: data.customSeller ?? null, categoryId: Number.isInteger(Number(data.categoryId)) ? Number(data.categoryId) : 0, createdAt: data.createdAt ? new Date(data.createdAt) : new Date(), updatedAt: data.updatedAt ? new Date(data.updatedAt) : new Date(), } } async function queueVoteUpdate({ dealId, userId, voteType, createdAt }) { if (!dealId || !userId) return const redis = createRedisClient() const field = `vote:${dealId}:${userId}` const payload = JSON.stringify({ dealId: Number(dealId), userId: Number(userId), voteType: Number(voteType), createdAt, }) await tryQueue({ label: "vote", redisAction: () => redis.hset(VOTE_HASH_KEY, field, payload), fallbackAction: () => voteDb.voteDealTx({ dealId: Number(dealId), userId: Number(userId), voteType: Number(voteType), createdAt, }), }) } async function queueCommentLikeUpdate({ commentId, userId, like, createdAt }) { if (!commentId || !userId) return const redis = createRedisClient() const field = `commentLike:${commentId}:${userId}` const payload = JSON.stringify({ commentId: Number(commentId), userId: Number(userId), like: Boolean(like), createdAt, }) await tryQueue({ label: "comment-like", redisAction: () => redis.hset(COMMENT_LIKE_HASH_KEY, field, payload), fallbackAction: () => commentLikeDb.setCommentLike({ commentId: Number(commentId), userId: Number(userId), like: Boolean(like), }), }) } async function queueCommentCreate({ commentId, dealId, userId, text, parentId, createdAt }) { if (!commentId || !dealId || !userId) return const redis = createRedisClient() const field = `comment:${commentId}` const payload = JSON.stringify({ commentId: Number(commentId), dealId: Number(dealId), userId: Number(userId), text: String(text || ""), parentId: parentId ? Number(parentId) : null, createdAt, }) await tryQueue({ label: "comment-create", redisAction: () => redis.hset(COMMENT_HASH_KEY, field, payload), fallbackAction: async () => { await prisma.$transaction(async (tx) => { await tx.comment.create({ data: { id: Number(commentId), dealId: Number(dealId), userId: Number(userId), text: String(text || ""), parentId: parentId ? Number(parentId) : null, createdAt: createdAt ? new Date(createdAt) : new Date(), }, }) await tx.deal.update({ where: { id: Number(dealId) }, data: { commentCount: { increment: 1 } }, }) }) }, }) } async function queueCommentDelete({ commentId, dealId, createdAt }) { if (!commentId || !dealId) return const redis = createRedisClient() const field = `commentDelete:${commentId}` const payload = JSON.stringify({ commentId: Number(commentId), dealId: Number(dealId), createdAt, }) await tryQueue({ label: "comment-delete", redisAction: () => redis.hset(COMMENT_DELETE_HASH_KEY, field, payload), fallbackAction: async () => { await prisma.$transaction(async (tx) => { const result = await tx.comment.updateMany({ where: { id: Number(commentId), deletedAt: null }, data: { deletedAt: new Date() }, }) if (result.count > 0) { await tx.deal.update({ where: { id: Number(dealId) }, data: { commentCount: { decrement: 1 } }, }) } }) }, }) } async function queueDealUpdate({ dealId, data, updatedAt }) { if (!dealId || !data || typeof data !== "object") return const redis = createRedisClient() const field = `dealUpdate:${dealId}` const payload = JSON.stringify({ dealId: Number(dealId), data, updatedAt, }) const patch = sanitizeDealUpdate(data) await tryQueue({ label: "deal-update", redisAction: () => redis.hset(DEAL_UPDATE_HASH_KEY, field, payload), fallbackAction: async () => { if (!Object.keys(patch).length) return await prisma.deal.update({ where: { id: Number(dealId) }, data: { ...patch, updatedAt: updatedAt ? new Date(updatedAt) : new Date() }, }) }, }) } async function queueDealCreate({ dealId, data, images = [], createdAt }) { if (!dealId || !data || typeof data !== "object") return const redis = createRedisClient() const field = `dealCreate:${dealId}` const payload = JSON.stringify({ dealId: Number(dealId), data, images: Array.isArray(images) ? images : [], createdAt, }) await tryQueue({ label: "deal-create", redisAction: () => redis.hset(DEAL_CREATE_HASH_KEY, field, payload), fallbackAction: async () => { const normalized = normalizeDealCreateData({ ...data, id: dealId }) await prisma.deal.create({ data: normalized }) await dealAnalyticsDb.ensureTotalsForDealIds([Number(dealId)]) if (Array.isArray(images) && images.length) { const imagesData = images.map((img) => ({ dealId: Number(dealId), imageUrl: String(img.imageUrl || ""), order: Number(img.order || 0), })) await prisma.dealImage.createMany({ data: imagesData }) } await prisma.$executeRawUnsafe( 'SELECT setval(pg_get_serial_sequence(\'"Deal"\', \'id\'), (SELECT MAX(id) FROM "Deal"))' ) }, }) } async function queueDealAiReviewUpdate({ dealId, data, updatedAt }) { if (!dealId || !data || typeof data !== "object") return const redis = createRedisClient() const field = `dealAiReview:${dealId}` const payload = JSON.stringify({ dealId: Number(dealId), data, updatedAt, }) await tryQueue({ label: "deal-ai-review", redisAction: () => redis.hset(DEAL_AI_REVIEW_HASH_KEY, field, payload), fallbackAction: () => prisma.dealAiReview.upsert({ where: { dealId: Number(dealId) }, create: { dealId: Number(dealId), bestCategoryId: Number(data.bestCategoryId) || 0, tags: Array.isArray(data.tags) ? data.tags : [], needsReview: Boolean(data.needsReview), hasIssue: Boolean(data.hasIssue), issueType: String(data.issueType || "NONE"), issueReason: data.issueReason ?? null, }, update: { bestCategoryId: Number(data.bestCategoryId) || 0, tags: Array.isArray(data.tags) ? data.tags : [], needsReview: Boolean(data.needsReview), hasIssue: Boolean(data.hasIssue), issueType: String(data.issueType || "NONE"), issueReason: data.issueReason ?? null, }, }), }) } async function queueNotificationCreate({ userId, message, type = "INFO", createdAt }) { if (!userId || !message) return const redis = createRedisClient() const field = `notification:${userId}:${Date.now()}` const payload = JSON.stringify({ userId: Number(userId), message: String(message), type: String(type || "INFO"), createdAt, }) await tryQueue({ label: "notification-create", redisAction: () => redis.hset(NOTIFICATION_HASH_KEY, field, payload), fallbackAction: async () => { await prisma.$transaction(async (tx) => { await tx.notification.create({ data: { userId: Number(userId), message: String(message), type: String(type || "INFO"), createdAt: createdAt ? new Date(createdAt) : new Date(), }, }) await tx.user.update({ where: { id: Number(userId) }, data: { notificationCount: { increment: 1 } }, }) }) }, }) } async function queueNotificationReadAll({ userId, readAt }) { if (!userId) return const redis = createRedisClient() const field = `notificationRead:${userId}:${Date.now()}` const payload = JSON.stringify({ userId: Number(userId), readAt, }) await tryQueue({ label: "notification-read", redisAction: () => redis.hset(NOTIFICATION_READ_HASH_KEY, field, payload), fallbackAction: async () => { const readAtDate = readAt ? new Date(readAt) : new Date() await prisma.notification.updateMany({ where: { userId: Number(userId), readAt: null, createdAt: { lte: readAtDate }, }, data: { readAt: readAtDate }, }) }, }) } async function queueDealSaveUpdate({ dealId, userId, action, createdAt }) { if (!dealId || !userId) return const normalized = String(action || "").toUpperCase() if (!["SAVE", "UNSAVE"].includes(normalized)) return const redis = createRedisClient() const field = `dealSave:${dealId}:${userId}` const payload = JSON.stringify({ dealId: Number(dealId), userId: Number(userId), action: normalized, createdAt, }) await tryQueue({ label: "deal-save", redisAction: () => redis.hset(DEAL_SAVE_HASH_KEY, field, payload), fallbackAction: async () => { if (normalized === "SAVE") { await dealSaveDb.upsertDealSave({ dealId: Number(dealId), userId: Number(userId), createdAt: createdAt ? new Date(createdAt) : new Date(), }) return } await dealSaveDb.deleteDealSave({ dealId: Number(dealId), userId: Number(userId) }) }, }) } async function queueAuditEvent({ userId, action, ip, userAgent, meta = null, createdAt }) { if (!action) return const redis = createRedisClient() const field = `audit:${Date.now()}:${Math.random().toString(36).slice(2, 8)}` const payload = JSON.stringify({ userId: userId ? Number(userId) : null, action: String(action), ip: ip ?? null, userAgent: userAgent ?? null, meta, createdAt, }) await tryQueue({ label: "audit", redisAction: () => redis.hset(AUDIT_HASH_KEY, field, payload), fallbackAction: () => prisma.auditEvent.create({ data: { userId: userId ? Number(userId) : null, action: String(action), ip: ip ?? null, userAgent: userAgent ?? null, meta, createdAt: createdAt ? new Date(createdAt) : new Date(), }, }), }) } async function queueUserUpdate({ userId, data, updatedAt }) { if (!userId || !data || typeof data !== "object") return const redis = createRedisClient() const field = `userUpdate:${userId}` const payload = JSON.stringify({ userId: Number(userId), data, updatedAt, }) await tryQueue({ label: "user-update", redisAction: () => redis.hset(USER_UPDATE_HASH_KEY, field, payload), fallbackAction: () => prisma.user.update({ where: { id: Number(userId) }, data: { ...data, updatedAt: updatedAt ? new Date(updatedAt) : new Date() }, }), }) } async function queueUserNoteCreate({ userId, createdById, note, createdAt }) { if (!userId || !createdById || !note) return const redis = createRedisClient() const field = `userNote:${userId}:${Date.now()}` const payload = JSON.stringify({ userId: Number(userId), createdById: Number(createdById), note: String(note), createdAt, }) await tryQueue({ label: "user-note", redisAction: () => redis.hset(USER_NOTE_HASH_KEY, field, payload), fallbackAction: () => prisma.userNote.create({ data: { userId: Number(userId), createdById: Number(createdById), note: String(note), createdAt: createdAt ? new Date(createdAt) : new Date(), }, }), }) } async function queueDealReportStatusUpdate({ reportId, status, updatedAt }) { if (!reportId || !status) return const redis = createRedisClient() const field = `dealReport:${reportId}` const payload = JSON.stringify({ reportId: Number(reportId), status: String(status), updatedAt, }) await tryQueue({ label: "deal-report", redisAction: () => redis.hset(DEAL_REPORT_UPDATE_HASH_KEY, field, payload), fallbackAction: () => prisma.dealReport.update({ where: { id: Number(reportId) }, data: { status: String(status), updatedAt: updatedAt ? new Date(updatedAt) : new Date() }, }), }) } async function queueCategoryUpsert({ categoryId, data, updatedAt }) { if (!categoryId || !data || typeof data !== "object") return const redis = createRedisClient() const field = `category:${categoryId}` const payload = JSON.stringify({ categoryId: Number(categoryId), data, updatedAt, }) await tryQueue({ label: "category-upsert", redisAction: () => redis.hset(CATEGORY_UPSERT_HASH_KEY, field, payload), fallbackAction: () => prisma.category.upsert({ where: { id: Number(categoryId) }, create: { id: Number(categoryId), ...data }, update: data, }), }) } async function queueSellerUpsert({ sellerId, data, updatedAt }) { if (!sellerId || !data || typeof data !== "object") return const redis = createRedisClient() const field = `seller:${sellerId}` const payload = JSON.stringify({ sellerId: Number(sellerId), data, updatedAt, }) await tryQueue({ label: "seller-upsert", redisAction: () => redis.hset(SELLER_UPSERT_HASH_KEY, field, payload), fallbackAction: () => prisma.seller.upsert({ where: { id: Number(sellerId) }, create: { id: Number(sellerId), ...data }, update: data, }), }) } async function queueSellerDomainUpsert({ sellerId, domain, createdById }) { if (!sellerId || !domain || !createdById) return const redis = createRedisClient() const normalizedDomain = String(domain).toLowerCase() const field = `sellerDomain:${sellerId}:${normalizedDomain}` const payload = JSON.stringify({ sellerId: Number(sellerId), domain: normalizedDomain, createdById: Number(createdById), }) await tryQueue({ label: "seller-domain-upsert", redisAction: () => redis.hset(SELLER_DOMAIN_UPSERT_HASH_KEY, field, payload), fallbackAction: () => prisma.sellerDomain.upsert({ where: { domain: normalizedDomain }, create: { domain: normalizedDomain, sellerId: Number(sellerId), createdById: Number(createdById), }, update: { sellerId: Number(sellerId) }, }), }) } module.exports = { queueVoteUpdate, queueCommentLikeUpdate, queueCommentCreate, queueCommentDelete, queueDealUpdate, queueDealCreate, queueDealAiReviewUpdate, queueNotificationCreate, queueNotificationReadAll, queueDealSaveUpdate, queueAuditEvent, queueUserUpdate, queueUserNoteCreate, queueDealReportStatusUpdate, queueCategoryUpsert, queueSellerUpsert, queueSellerDomainUpsert, COMMENT_HASH_KEY, COMMENT_DELETE_HASH_KEY, VOTE_HASH_KEY, COMMENT_LIKE_HASH_KEY, DEAL_UPDATE_HASH_KEY, DEAL_CREATE_HASH_KEY, DEAL_AI_REVIEW_HASH_KEY, NOTIFICATION_HASH_KEY, NOTIFICATION_READ_HASH_KEY, DEAL_SAVE_HASH_KEY, AUDIT_HASH_KEY, USER_UPDATE_HASH_KEY, USER_NOTE_HASH_KEY, DEAL_REPORT_UPDATE_HASH_KEY, CATEGORY_UPSERT_HASH_KEY, SELLER_UPSERT_HASH_KEY, SELLER_DOMAIN_UPSERT_HASH_KEY, DEAL_ANALYTICS_TOTAL_HASH_KEY, }