create job to reimport messages

This commit is contained in:
bosiraphael
2024-09-18 16:42:38 +02:00
parent 1048fe5500
commit ee630e04ce
5 changed files with 99 additions and 6 deletions

View File

@@ -12,7 +12,6 @@ import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service';
import { MessagingFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-full-message-list-fetch.service';
import { MessagingPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/services/messaging-partial-message-list-fetch.service';
import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service';
@@ -36,7 +35,6 @@ export class MessagingMessageListFetchJob {
private readonly connectedAccountRepository: ConnectedAccountRepository,
private readonly messagingTelemetryService: MessagingTelemetryService,
private readonly twentyORMManager: TwentyORMManager,
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
) {}
@Process(MessagingMessageListFetchJob.name)

View File

@@ -12,7 +12,6 @@ import {
MessageChannelSyncStage,
MessageChannelWorkspaceEntity,
} from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessageImportExceptionHandlerService } from 'src/modules/messaging/message-import-manager/services/message-import-exception-handler.service';
import { MessagingMessagesImportService } from 'src/modules/messaging/message-import-manager/services/messaging-messages-import.service';
import { MessagingTelemetryService } from 'src/modules/messaging/monitoring/services/messaging-telemetry.service';
@@ -32,7 +31,6 @@ export class MessagingMessagesImportJob {
private readonly messagingMessagesImportService: MessagingMessagesImportService,
private readonly messagingTelemetryService: MessagingTelemetryService,
private readonly twentyORMManager: TwentyORMManager,
private readonly messageImportErrorHandlerService: MessageImportExceptionHandlerService,
) {}
@Process(MessagingMessagesImportJob.name)

View File

@@ -0,0 +1,40 @@
import { Logger } from '@nestjs/common';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
import { MessagingCleanCacheJob } from 'src/modules/messaging/message-import-manager/jobs/messaging-clean-cache';
export type MessagingSettingsManagerReimportMessagesJobData = {
workspaceId: string;
messageChannelIds: string[];
};
@Processor(MessageQueue.messagingQueue)
export class MessagingSettingsManagerReimportMessagesJob {
private readonly logger = new Logger(
MessagingSettingsManagerReimportMessagesJob.name,
);
constructor(
private readonly messageChannelSyncStatusService: MessageChannelSyncStatusService,
) {}
@Process(MessagingCleanCacheJob.name)
async handle(
data: MessagingSettingsManagerReimportMessagesJobData,
): Promise<void> {
const { workspaceId, messageChannelIds } = data;
this.logger.log(
`Reimporting messages for workspace ${workspaceId} and message channels ${messageChannelIds.join(
', ',
)}`,
);
await this.messageChannelSyncStatusService.scheduleFullMessageListFetch(
messageChannelIds,
);
}
}

View File

@@ -0,0 +1,50 @@
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { ObjectRecordUpdateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-update.event';
import { objectRecordChangedProperties } from 'src/engine/core-modules/event-emitter/utils/object-record-changed-properties.util';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { MessagingSettingsManagerReimportMessagesJob } from 'src/modules/messaging/settings-manager/jobs/messaging-settings-manager-reimport-messages.job';
@Injectable()
export class MessagingSettingsManagerMessageChannelListener {
constructor(
@InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
) {}
@OnEvent('messageChannel.updated')
async handleUpdatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordUpdateEvent<MessageChannelWorkspaceEntity>
>,
) {
const filteredEvents = payload.events.filter((eventPayload) =>
objectRecordChangedProperties(
eventPayload.properties.before,
eventPayload.properties.after,
).includes('excludeGroupEmails'),
);
const messageChannelsToReimport = filteredEvents
.filter(
(eventPayload) =>
eventPayload.properties.after.excludeGroupEmails === false,
)
.map((eventPayload) => eventPayload.properties.after.id);
if (messageChannelsToReimport.length) {
await this.messageQueueService.add(
MessagingSettingsManagerReimportMessagesJob.name,
{
workspaceId: payload.workspaceId,
messageChannelIds: messageChannelsToReimport,
},
);
}
}
}

View File

@@ -1,8 +1,15 @@
import { Module } from '@nestjs/common';
import { MessagingCommonModule } from 'src/modules/messaging/common/messaging-common.module';
import { MessagingSettingsManagerReimportMessagesJob } from 'src/modules/messaging/settings-manager/jobs/messaging-settings-manager-reimport-messages.job';
import { MessagingSettingsManagerMessageChannelListener } from 'src/modules/messaging/settings-manager/listeners/messaging-settings-manager-message-channel.listener';
@Module({
imports: [],
providers: [],
imports: [MessagingCommonModule],
providers: [
MessagingSettingsManagerMessageChannelListener,
MessagingSettingsManagerReimportMessagesJob,
],
exports: [],
})
export class MessagingSettingsManagerModule {}