diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts index 1b301dfe4..965116076 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/calendar-event-import-manager.module.ts @@ -10,15 +10,19 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { CalendarEventCleanerModule } from 'src/modules/calendar/calendar-event-cleaner/calendar-event-cleaner.module'; import { CalendarEventListFetchCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-event-list-fetch.cron.command'; +import { CalendarImportCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-import.cron.command'; import { CalendarOngoingStaleCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-ongoing-stale.cron.command'; import { CalendarEventListFetchCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job'; +import { CalendarImportCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-import.cron.job'; import { CalendarOngoingStaleCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-ongoing-stale.cron.job'; import { GoogleCalendarDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/google-calendar-driver.module'; import { MicrosoftCalendarDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/microsoft-calendar-driver.module'; import { CalendarEventListFetchJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job'; +import { CalendarImportJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-import.job'; import { CalendarOngoingStaleJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job'; import { CalendarEventImportErrorHandlerService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service'; import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service'; +import { CalendarFetchEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-fetch-events.service'; import { CalendarGetCalendarEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service'; import { CalendarSaveEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-save-events.service'; import { CalendarEventParticipantManagerModule } from 'src/modules/calendar/calendar-event-participant-manager/calendar-event-participant-manager.module'; @@ -50,16 +54,20 @@ import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/sta providers: [ CalendarChannelSyncStatusService, CalendarEventsImportService, + CalendarFetchEventsService, CalendarEventImportErrorHandlerService, CalendarGetCalendarEventsService, CalendarSaveEventsService, CalendarEventListFetchCronJob, CalendarEventListFetchCronCommand, CalendarEventListFetchJob, + CalendarImportCronJob, + CalendarImportCronCommand, + CalendarImportJob, CalendarOngoingStaleCronJob, CalendarOngoingStaleCronCommand, CalendarOngoingStaleJob, ], - exports: [CalendarEventsImportService], + exports: [CalendarEventsImportService, CalendarFetchEventsService], }) export class CalendarEventImportManagerModule {} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/constants/calendar-event-import-batch-size.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/constants/calendar-event-import-batch-size.ts new file mode 100644 index 000000000..a9d6e86b1 --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/constants/calendar-event-import-batch-size.ts @@ -0,0 +1 @@ +export const CALENDAR_EVENT_IMPORT_BATCH_SIZE = 100; diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-event-list-fetch.cron.command.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-event-list-fetch.cron.command.ts index d49131893..ea7099949 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-event-list-fetch.cron.command.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-event-list-fetch.cron.command.ts @@ -3,10 +3,8 @@ import { Command, CommandRunner } from 'nest-commander'; import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; -import { - CALENDAR_EVENTS_IMPORT_CRON_PATTERN, - CalendarEventListFetchCronJob, -} from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job'; +import { CalendarEventListFetchCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job'; +import { CALENDAR_EVENTS_IMPORT_CRON_PATTERN } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-import.cron.job'; @Command({ name: 'cron:calendar:calendar-event-list-fetch', diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-import.cron.command.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-import.cron.command.ts new file mode 100644 index 000000000..b9a52d873 --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-import.cron.command.ts @@ -0,0 +1,32 @@ +import { Command, CommandRunner } from 'nest-commander'; + +import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { + CALENDAR_EVENTS_IMPORT_CRON_PATTERN, + CalendarImportCronJob, +} from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-import.cron.job'; + +@Command({ + name: 'cron:calendar:calendar-import', + description: 'Starts a cron job to import the calendar events', +}) +export class CalendarImportCronCommand extends CommandRunner { + constructor( + @InjectMessageQueue(MessageQueue.cronQueue) + private readonly messageQueueService: MessageQueueService, + ) { + super(); + } + + async run(): Promise { + await this.messageQueueService.addCron( + CalendarImportCronJob.name, + undefined, + { + repeat: { pattern: CALENDAR_EVENTS_IMPORT_CRON_PATTERN }, + }, + ); + } +} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-import.cron.job.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-import.cron.job.ts new file mode 100644 index 000000000..de695fd5c --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-import.cron.job.ts @@ -0,0 +1,87 @@ +import { InjectRepository } from '@nestjs/typeorm'; + +import { Equal, Repository } from 'typeorm'; + +import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator'; +import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service'; +import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { + Workspace, + WorkspaceActivationStatus, +} from 'src/engine/core-modules/workspace/workspace.entity'; +import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager'; +import { CalendarEventsImportJobData } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job'; +import { CalendarImportJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-import.job'; +import { CalendarChannelSyncStage } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; + +export const CALENDAR_EVENTS_IMPORT_CRON_PATTERN = '*/5 * * * *'; + +@Processor({ + queueName: MessageQueue.cronQueue, +}) +export class CalendarImportCronJob { + constructor( + @InjectRepository(Workspace, 'core') + private readonly workspaceRepository: Repository, + @InjectMessageQueue(MessageQueue.calendarQueue) + private readonly messageQueueService: MessageQueueService, + private readonly twentyORMGlobalManager: TwentyORMGlobalManager, + private readonly exceptionHandlerService: ExceptionHandlerService, + ) {} + + @Process(CalendarImportCronJob.name) + @SentryCronMonitor( + CalendarImportCronJob.name, + CALENDAR_EVENTS_IMPORT_CRON_PATTERN, + ) + async handle(): Promise { + console.time('CalendarImportCronJob time'); + + const activeWorkspaces = await this.workspaceRepository.find({ + where: { + activationStatus: WorkspaceActivationStatus.ACTIVE, + }, + }); + + for (const activeWorkspace of activeWorkspaces) { + try { + const calendarChannelRepository = + await this.twentyORMGlobalManager.getRepositoryForWorkspace( + activeWorkspace.id, + 'calendarChannel', + ); + + const calendarChannels = await calendarChannelRepository.find({ + where: { + isSyncEnabled: true, + syncStage: Equal( + CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING, + ), + }, + }); + + for (const calendarChannel of calendarChannels) { + await this.messageQueueService.add( + CalendarImportJob.name, + { + calendarChannelId: calendarChannel.id, + workspaceId: activeWorkspace.id, + }, + ); + } + } catch (error) { + this.exceptionHandlerService.captureExceptions([error], { + user: { + workspaceId: activeWorkspace.id, + }, + }); + } + } + + console.timeEnd('CalendarImportCronJob time'); + } +} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/services/google-calendar-get-events.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/services/google-calendar-get-events.service.ts index a140b8bfb..c540a8b55 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/services/google-calendar-get-events.service.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/services/google-calendar-get-events.service.ts @@ -72,6 +72,7 @@ export class GoogleCalendarGetEventsService { } return { + fullEvents: true, calendarEvents: formatGoogleCalendarEvents(events), nextSyncCursor: nextSyncToken || '', }; diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/microsoft-calendar-driver.module.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/microsoft-calendar-driver.module.ts index ceb3eb283..def97bfb6 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/microsoft-calendar-driver.module.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/microsoft-calendar-driver.module.ts @@ -2,14 +2,19 @@ import { Module } from '@nestjs/common'; import { EnvironmentModule } from 'src/engine/core-modules/environment/environment.module'; import { MicrosoftCalendarGetEventsService } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-get-events.service'; +import { MicrosoftCalendarImportEventsService } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-import-events.service'; import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service'; @Module({ imports: [EnvironmentModule], providers: [ MicrosoftCalendarGetEventsService, + MicrosoftCalendarImportEventsService, MicrosoftOAuth2ClientManagerService, ], - exports: [MicrosoftCalendarGetEventsService], + exports: [ + MicrosoftCalendarGetEventsService, + MicrosoftCalendarImportEventsService, + ], }) export class MicrosoftCalendarDriverModule {} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-get-events.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-get-events.service.ts index 46d3f53d4..ce9da8555 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-get-events.service.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-get-events.service.ts @@ -5,9 +5,7 @@ import { PageIterator, PageIteratorCallback, } from '@microsoft/microsoft-graph-client'; -import { Event } from '@microsoft/microsoft-graph-types'; -import { formatMicrosoftCalendarEvents } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/utils/format-microsoft-calendar-event.util'; import { parseMicrosoftCalendarError } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/utils/parse-microsoft-calendar-error.util'; import { GetCalendarEventsResponse } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service'; import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service'; @@ -31,61 +29,34 @@ export class MicrosoftCalendarGetEventsService { await this.microsoftOAuth2ClientManagerService.getOAuth2Client( connectedAccount.refreshToken, ); + const eventIds: string[] = []; - const { changedEvents, nextSyncToken } = - await this.getChangedCalendarEventIds(connectedAccount, syncCursor); + const response: PageCollection = await microsoftClient + .api(syncCursor || '/me/calendar/events/delta') + .version('beta') + .get(); - const events: Event[] = []; + const callback: PageIteratorCallback = (data) => { + eventIds.push(data.id); - for (const changedEvent of changedEvents) { - const event = await microsoftClient - .api(`/me/calendar/events/${changedEvent.id}`) - .get(); + return true; + }; - events.push(event); - } + const pageIterator = new PageIterator( + microsoftClient, + response, + callback, + ); + + await pageIterator.iterate(); return { - calendarEvents: formatMicrosoftCalendarEvents(events), - nextSyncCursor: nextSyncToken || '', + fullEvents: false, + calendarEventIds: eventIds, + nextSyncCursor: pageIterator.getDeltaLink() || '', }; } catch (error) { throw parseMicrosoftCalendarError(error); } } - - private async getChangedCalendarEventIds( - connectedAccount: Pick< - ConnectedAccountWorkspaceEntity, - 'provider' | 'refreshToken' | 'id' - >, - syncCursor?: string, - ): Promise<{ - changedEvents: Pick[]; - nextSyncToken?: string; - }> { - const microsoftClient = - await this.microsoftOAuth2ClientManagerService.getOAuth2Client( - connectedAccount.refreshToken, - ); - const events: Event[] = []; - - const response: PageCollection = await microsoftClient - .api(syncCursor || '/me/calendar/events/delta') - .version('beta') - .get(); - const callback: PageIteratorCallback = (data) => { - events.push(data); - - return true; - }; - const pageIterator = new PageIterator(microsoftClient, response, callback); - - await pageIterator.iterate(); - - return { - changedEvents: events, - nextSyncToken: pageIterator.getDeltaLink(), - }; - } } diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-import-events.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-import-events.service.ts new file mode 100644 index 000000000..7f6ac5223 --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-import-events.service.ts @@ -0,0 +1,45 @@ +import { Injectable } from '@nestjs/common'; + +import { Event } from '@microsoft/microsoft-graph-types'; + +import { formatMicrosoftCalendarEvents } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/utils/format-microsoft-calendar-event.util'; +import { parseMicrosoftCalendarError } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/utils/parse-microsoft-calendar-error.util'; +import { CalendarEventWithParticipants } from 'src/modules/calendar/common/types/calendar-event'; +import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; + +@Injectable() +export class MicrosoftCalendarImportEventsService { + constructor( + private readonly microsoftOAuth2ClientManagerService: MicrosoftOAuth2ClientManagerService, + ) {} + + public async getCalendarEvents( + connectedAccount: Pick< + ConnectedAccountWorkspaceEntity, + 'provider' | 'refreshToken' | 'id' + >, + changedEventIds: string[], + ): Promise { + try { + const microsoftClient = + await this.microsoftOAuth2ClientManagerService.getOAuth2Client( + connectedAccount.refreshToken, + ); + + const events: Event[] = []; + + for (const changedEventId of changedEventIds) { + const event = await microsoftClient + .api(`/me/calendar/events/${changedEventId}`) + .get(); + + events.push(event); + } + + return formatMicrosoftCalendarEvents(events); + } catch (error) { + throw parseMicrosoftCalendarError(error); + } + } +} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job.ts index 1e28927e2..089bc59d2 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job.ts @@ -5,6 +5,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service'; +import { CalendarFetchEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-fetch-events.service'; import { CalendarChannelSyncStage, CalendarChannelWorkspaceEntity, @@ -23,7 +24,7 @@ export type CalendarEventsImportJobData = { export class CalendarEventListFetchJob { constructor( private readonly twentyORMManager: TwentyORMManager, - private readonly calendarEventsImportService: CalendarEventsImportService, + private readonly calendarFetchEventsService: CalendarFetchEventsService, ) {} @Process(CalendarEventListFetchJob.name) @@ -65,7 +66,7 @@ export class CalendarEventListFetchJob { syncStageStartedAt: null, }); - await this.calendarEventsImportService.processCalendarEventsImport( + await this.calendarFetchEventsService.fetchCalendarEvents( calendarChannel, calendarChannel.connectedAccount, workspaceId, @@ -73,7 +74,7 @@ export class CalendarEventListFetchJob { break; case CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING: - await this.calendarEventsImportService.processCalendarEventsImport( + await this.calendarFetchEventsService.fetchCalendarEvents( calendarChannel, calendarChannel.connectedAccount, workspaceId, diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-import.job.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-import.job.ts new file mode 100644 index 000000000..37d264cb1 --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/jobs/calendar-import.job.ts @@ -0,0 +1,75 @@ +import { Scope } from '@nestjs/common'; + +import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service'; +import { + CalendarChannelSyncStage, + CalendarChannelWorkspaceEntity, +} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; +import { isThrottled } from 'src/modules/connected-account/utils/is-throttled'; + +export type CalendarEventsImportJobData = { + calendarChannelId: string; + workspaceId: string; +}; + +@Processor({ + queueName: MessageQueue.calendarQueue, + scope: Scope.REQUEST, +}) +export class CalendarImportJob { + constructor( + private readonly calendarEventsImportService: CalendarEventsImportService, + private readonly twentyORMManager: TwentyORMManager, + ) {} + + @Process(CalendarImportJob.name) + async handle(data: CalendarEventsImportJobData): Promise { + console.time('CalendarEventsImportJob time'); + + const { calendarChannelId, workspaceId } = data; + + const calendarChannelRepository = + await this.twentyORMManager.getRepository( + 'calendarChannel', + ); + const calendarChannel = await calendarChannelRepository.findOne({ + where: { + id: calendarChannelId, + isSyncEnabled: true, + }, + relations: ['connectedAccount'], + }); + + if (!calendarChannel?.isSyncEnabled) { + return; + } + + if ( + isThrottled( + calendarChannel.syncStageStartedAt, + calendarChannel.throttleFailureCount, + ) + ) { + return; + } + + if ( + calendarChannel.syncStage !== + CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING + ) { + return; + } + + await this.calendarEventsImportService.processCalendarEventsImport( + calendarChannel, + calendarChannel.connectedAccount, + workspaceId, + ); + + console.timeEnd('CalendarEventsImportJob time'); + } +} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service.ts index 0dd215b88..826a73a82 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service.ts @@ -2,84 +2,86 @@ import { Injectable } from '@nestjs/common'; import { Any } from 'typeorm'; +import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator'; +import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; +import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum'; import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository'; import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity'; import { CalendarEventCleanerService } from 'src/modules/calendar/calendar-event-cleaner/services/calendar-event-cleaner.service'; +import { CALENDAR_EVENT_IMPORT_BATCH_SIZE } from 'src/modules/calendar/calendar-event-import-manager/constants/calendar-event-import-batch-size'; +import { MicrosoftCalendarImportEventsService } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-import-events.service'; import { CalendarEventImportErrorHandlerService, CalendarEventImportSyncStep, } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service'; -import { - CalendarGetCalendarEventsService, - GetCalendarEventsResponse, -} from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service'; import { CalendarSaveEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-save-events.service'; import { filterEventsAndReturnCancelledEvents } from 'src/modules/calendar/calendar-event-import-manager/utils/filter-events.util'; import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service'; import { CalendarChannelEventAssociationWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel-event-association.workspace-entity'; -import { - CalendarChannelSyncStage, - CalendarChannelWorkspaceEntity, -} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; +import { CalendarChannelWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; +import { CalendarEventWithParticipants } from 'src/modules/calendar/common/types/calendar-event'; import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; @Injectable() export class CalendarEventsImportService { constructor( + @InjectCacheStorage(CacheStorageNamespace.ModuleCalendar) + private readonly cacheStorage: CacheStorageService, private readonly twentyORMManager: TwentyORMManager, @InjectObjectMetadataRepository(BlocklistWorkspaceEntity) private readonly blocklistRepository: BlocklistRepository, private readonly calendarEventCleanerService: CalendarEventCleanerService, private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService, - private readonly getCalendarEventsService: CalendarGetCalendarEventsService, private readonly calendarSaveEventsService: CalendarSaveEventsService, private readonly calendarEventImportErrorHandlerService: CalendarEventImportErrorHandlerService, + private readonly microsoftCalendarImportEventService: MicrosoftCalendarImportEventsService, ) {} public async processCalendarEventsImport( calendarChannel: CalendarChannelWorkspaceEntity, connectedAccount: ConnectedAccountWorkspaceEntity, workspaceId: string, + fetchedCalendarEvents?: CalendarEventWithParticipants[], ): Promise { - const syncStep = - calendarChannel.syncStage === - CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING - ? CalendarEventImportSyncStep.FULL_CALENDAR_EVENT_LIST_FETCH - : CalendarEventImportSyncStep.PARTIAL_CALENDAR_EVENT_LIST_FETCH; - - await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchOngoing( + await this.calendarChannelSyncStatusService.markAsCalendarEventsImportOngoing( [calendarChannel.id], ); - let calendarEvents: GetCalendarEventsResponse['calendarEvents'] = []; - let nextSyncCursor: GetCalendarEventsResponse['nextSyncCursor'] = ''; + + let calendarEvents: CalendarEventWithParticipants[] = []; try { - const getCalendarEventsResponse = - await this.getCalendarEventsService.getCalendarEvents( - connectedAccount, - calendarChannel.syncCursor, + if (fetchedCalendarEvents) { + calendarEvents = fetchedCalendarEvents; + } else { + const eventIdsToFetch: string[] = await this.cacheStorage.setPop( + `calendar-events-to-import:${workspaceId}:${calendarChannel.id}`, + CALENDAR_EVENT_IMPORT_BATCH_SIZE, ); - calendarEvents = getCalendarEventsResponse.calendarEvents; - nextSyncCursor = getCalendarEventsResponse.nextSyncCursor; + if (!eventIdsToFetch || eventIdsToFetch.length === 0) { + await this.calendarChannelSyncStatusService.markAsCompletedAndSchedulePartialCalendarEventListFetch( + [calendarChannel.id], + ); - const calendarChannelRepository = - await this.twentyORMManager.getRepository( - 'calendarChannel', - ); + return; + } + + switch (connectedAccount.provider) { + case 'microsoft': + calendarEvents = + await this.microsoftCalendarImportEventService.getCalendarEvents( + connectedAccount, + eventIdsToFetch, + ); + break; + default: + break; + } + } if (!calendarEvents || calendarEvents?.length === 0) { - await calendarChannelRepository.update( - { - id: calendarChannel.id, - }, - { - syncCursor: nextSyncCursor, - }, - ); - await this.calendarChannelSyncStatusService.schedulePartialCalendarEventListFetch( [calendarChannel.id], ); @@ -127,22 +129,13 @@ export class CalendarEventsImportService { workspaceId, ); - await calendarChannelRepository.update( - { - id: calendarChannel.id, - }, - { - syncCursor: nextSyncCursor, - }, - ); - await this.calendarChannelSyncStatusService.markAsCompletedAndSchedulePartialCalendarEventListFetch( [calendarChannel.id], ); } catch (error) { await this.calendarEventImportErrorHandlerService.handleDriverException( error, - syncStep, + CalendarEventImportSyncStep.CALENDAR_EVENTS_IMPORT, calendarChannel, workspaceId, ); diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-fetch-events.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-fetch-events.service.ts new file mode 100644 index 000000000..bdea7c2f9 --- /dev/null +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-fetch-events.service.ts @@ -0,0 +1,144 @@ +import { Injectable } from '@nestjs/common'; + +import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator'; +import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service'; +import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum'; +import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; +import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { + CalendarEventImportDriverException, + CalendarEventImportDriverExceptionCode, +} from 'src/modules/calendar/calendar-event-import-manager/drivers/exceptions/calendar-event-import-driver.exception'; +import { CalendarEventsImportJobData } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job'; +import { CalendarImportJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-import.job'; +import { + CalendarEventImportErrorHandlerService, + CalendarEventImportSyncStep, +} from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service'; +import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service'; +import { CalendarGetCalendarEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service'; +import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service'; +import { + CalendarChannelSyncStage, + CalendarChannelWorkspaceEntity, +} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity'; +import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; + +@Injectable() +export class CalendarFetchEventsService { + constructor( + @InjectCacheStorage(CacheStorageNamespace.ModuleCalendar) + private readonly cacheStorage: CacheStorageService, + private readonly twentyORMManager: TwentyORMManager, + private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService, + private readonly getCalendarEventsService: CalendarGetCalendarEventsService, + private readonly calendarEventImportErrorHandlerService: CalendarEventImportErrorHandlerService, + private readonly calendarEventsImportService: CalendarEventsImportService, + @InjectMessageQueue(MessageQueue.messagingQueue) + private readonly messageQueueService: MessageQueueService, + + ) {} + + public async fetchCalendarEvents( + calendarChannel: CalendarChannelWorkspaceEntity, + connectedAccount: ConnectedAccountWorkspaceEntity, + workspaceId: string, + ): Promise { + const syncStep = + calendarChannel.syncStage === + CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING + ? CalendarEventImportSyncStep.FULL_CALENDAR_EVENT_LIST_FETCH + : CalendarEventImportSyncStep.PARTIAL_CALENDAR_EVENT_LIST_FETCH; + + await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchOngoing( + [calendarChannel.id], + ); + + try { + const getCalendarEventsResponse = + await this.getCalendarEventsService.getCalendarEvents( + connectedAccount, + calendarChannel.syncCursor, + ); + + const hasFullEvents = getCalendarEventsResponse.fullEvents; + + const calendarEvents = hasFullEvents + ? getCalendarEventsResponse.calendarEvents + : null; + const calendarEventIds = getCalendarEventsResponse.calendarEventIds; + const nextSyncCursor = getCalendarEventsResponse.nextSyncCursor; + + const calendarChannelRepository = + await this.twentyORMManager.getRepository( + 'calendarChannel', + ); + + if (!calendarEvents || calendarEvents?.length === 0) { + await calendarChannelRepository.update( + { + id: calendarChannel.id, + }, + { + syncCursor: nextSyncCursor, + }, + ); + + await this.calendarChannelSyncStatusService.schedulePartialCalendarEventListFetch( + [calendarChannel.id], + ); + } + + await calendarChannelRepository.update( + { + id: calendarChannel.id, + }, + { + syncCursor: nextSyncCursor, + }, + ); + + if (hasFullEvents && calendarEvents) { + // Event Import already done + this.calendarEventsImportService.processCalendarEventsImport( + calendarChannel, + connectedAccount, + workspaceId, + calendarEvents, + ); + } else if (!hasFullEvents && calendarEventIds) { + // Event Import still needed + + await this.cacheStorage.setAdd( + `calendar-events-to-import:${workspaceId}:${calendarChannel.id}`, + calendarEventIds, + ); + + await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport( + [calendarChannel.id], + ); + + this.calendarEventsImportService.processCalendarEventsImport( + calendarChannel, + connectedAccount, + workspaceId, + ); + + } else { + throw new CalendarEventImportDriverException( + "Expected 'calendarEvents' or 'calendarEventIds' to be present", + CalendarEventImportDriverExceptionCode.UNKNOWN, + ); + } + } catch (error) { + await this.calendarEventImportErrorHandlerService.handleDriverException( + error, + syncStep, + calendarChannel, + workspaceId, + ); + } + } +} diff --git a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service.ts b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service.ts index f13eccd9b..6704fa355 100644 --- a/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service.ts +++ b/packages/twenty-server/src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service.ts @@ -10,7 +10,9 @@ import { CalendarEventWithParticipants } from 'src/modules/calendar/common/types import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity'; export type GetCalendarEventsResponse = { - calendarEvents: CalendarEventWithParticipants[]; + fullEvents: boolean; + calendarEvents?: CalendarEventWithParticipants[]; + calendarEventIds?: string[]; nextSyncCursor: string; };