import in two steps

This commit is contained in:
brendanlaschke
2024-10-27 18:43:05 +01:00
parent f1f00d7425
commit a1daf1d0bf
14 changed files with 468 additions and 105 deletions

View File

@@ -10,15 +10,19 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { CalendarEventCleanerModule } from 'src/modules/calendar/calendar-event-cleaner/calendar-event-cleaner.module';
import { CalendarEventListFetchCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-event-list-fetch.cron.command';
import { CalendarImportCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-import.cron.command';
import { CalendarOngoingStaleCronCommand } from 'src/modules/calendar/calendar-event-import-manager/crons/commands/calendar-ongoing-stale.cron.command';
import { CalendarEventListFetchCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job';
import { CalendarImportCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-import.cron.job';
import { CalendarOngoingStaleCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-ongoing-stale.cron.job';
import { GoogleCalendarDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/google-calendar/google-calendar-driver.module';
import { MicrosoftCalendarDriverModule } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/microsoft-calendar-driver.module';
import { CalendarEventListFetchJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job';
import { CalendarImportJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-import.job';
import { CalendarOngoingStaleJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-ongoing-stale.job';
import { CalendarEventImportErrorHandlerService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service';
import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service';
import { CalendarFetchEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-fetch-events.service';
import { CalendarGetCalendarEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service';
import { CalendarSaveEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-save-events.service';
import { CalendarEventParticipantManagerModule } from 'src/modules/calendar/calendar-event-participant-manager/calendar-event-participant-manager.module';
@@ -50,16 +54,20 @@ import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/sta
providers: [
CalendarChannelSyncStatusService,
CalendarEventsImportService,
CalendarFetchEventsService,
CalendarEventImportErrorHandlerService,
CalendarGetCalendarEventsService,
CalendarSaveEventsService,
CalendarEventListFetchCronJob,
CalendarEventListFetchCronCommand,
CalendarEventListFetchJob,
CalendarImportCronJob,
CalendarImportCronCommand,
CalendarImportJob,
CalendarOngoingStaleCronJob,
CalendarOngoingStaleCronCommand,
CalendarOngoingStaleJob,
],
exports: [CalendarEventsImportService],
exports: [CalendarEventsImportService, CalendarFetchEventsService],
})
export class CalendarEventImportManagerModule {}

View File

@@ -0,0 +1 @@
export const CALENDAR_EVENT_IMPORT_BATCH_SIZE = 100;

View File

@@ -3,10 +3,8 @@ import { Command, CommandRunner } from 'nest-commander';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import {
CALENDAR_EVENTS_IMPORT_CRON_PATTERN,
CalendarEventListFetchCronJob,
} from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job';
import { CalendarEventListFetchCronJob } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-event-list-fetch.cron.job';
import { CALENDAR_EVENTS_IMPORT_CRON_PATTERN } from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-import.cron.job';
@Command({
name: 'cron:calendar:calendar-event-list-fetch',

View File

@@ -0,0 +1,32 @@
import { Command, CommandRunner } from 'nest-commander';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import {
CALENDAR_EVENTS_IMPORT_CRON_PATTERN,
CalendarImportCronJob,
} from 'src/modules/calendar/calendar-event-import-manager/crons/jobs/calendar-import.cron.job';
@Command({
name: 'cron:calendar:calendar-import',
description: 'Starts a cron job to import the calendar events',
})
export class CalendarImportCronCommand extends CommandRunner {
constructor(
@InjectMessageQueue(MessageQueue.cronQueue)
private readonly messageQueueService: MessageQueueService,
) {
super();
}
async run(): Promise<void> {
await this.messageQueueService.addCron<undefined>(
CalendarImportCronJob.name,
undefined,
{
repeat: { pattern: CALENDAR_EVENTS_IMPORT_CRON_PATTERN },
},
);
}
}

View File

@@ -0,0 +1,87 @@
import { InjectRepository } from '@nestjs/typeorm';
import { Equal, Repository } from 'typeorm';
import { SentryCronMonitor } from 'src/engine/core-modules/cron/sentry-cron-monitor.decorator';
import { ExceptionHandlerService } from 'src/engine/core-modules/exception-handler/exception-handler.service';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import {
Workspace,
WorkspaceActivationStatus,
} from 'src/engine/core-modules/workspace/workspace.entity';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { CalendarEventsImportJobData } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job';
import { CalendarImportJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-import.job';
import { CalendarChannelSyncStage } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
export const CALENDAR_EVENTS_IMPORT_CRON_PATTERN = '*/5 * * * *';
@Processor({
queueName: MessageQueue.cronQueue,
})
export class CalendarImportCronJob {
constructor(
@InjectRepository(Workspace, 'core')
private readonly workspaceRepository: Repository<Workspace>,
@InjectMessageQueue(MessageQueue.calendarQueue)
private readonly messageQueueService: MessageQueueService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
private readonly exceptionHandlerService: ExceptionHandlerService,
) {}
@Process(CalendarImportCronJob.name)
@SentryCronMonitor(
CalendarImportCronJob.name,
CALENDAR_EVENTS_IMPORT_CRON_PATTERN,
)
async handle(): Promise<void> {
console.time('CalendarImportCronJob time');
const activeWorkspaces = await this.workspaceRepository.find({
where: {
activationStatus: WorkspaceActivationStatus.ACTIVE,
},
});
for (const activeWorkspace of activeWorkspaces) {
try {
const calendarChannelRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace(
activeWorkspace.id,
'calendarChannel',
);
const calendarChannels = await calendarChannelRepository.find({
where: {
isSyncEnabled: true,
syncStage: Equal(
CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING,
),
},
});
for (const calendarChannel of calendarChannels) {
await this.messageQueueService.add<CalendarEventsImportJobData>(
CalendarImportJob.name,
{
calendarChannelId: calendarChannel.id,
workspaceId: activeWorkspace.id,
},
);
}
} catch (error) {
this.exceptionHandlerService.captureExceptions([error], {
user: {
workspaceId: activeWorkspace.id,
},
});
}
}
console.timeEnd('CalendarImportCronJob time');
}
}

View File

@@ -72,6 +72,7 @@ export class GoogleCalendarGetEventsService {
}
return {
fullEvents: true,
calendarEvents: formatGoogleCalendarEvents(events),
nextSyncCursor: nextSyncToken || '',
};

View File

@@ -2,14 +2,19 @@ import { Module } from '@nestjs/common';
import { EnvironmentModule } from 'src/engine/core-modules/environment/environment.module';
import { MicrosoftCalendarGetEventsService } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-get-events.service';
import { MicrosoftCalendarImportEventsService } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-import-events.service';
import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service';
@Module({
imports: [EnvironmentModule],
providers: [
MicrosoftCalendarGetEventsService,
MicrosoftCalendarImportEventsService,
MicrosoftOAuth2ClientManagerService,
],
exports: [MicrosoftCalendarGetEventsService],
exports: [
MicrosoftCalendarGetEventsService,
MicrosoftCalendarImportEventsService,
],
})
export class MicrosoftCalendarDriverModule {}

View File

@@ -5,9 +5,7 @@ import {
PageIterator,
PageIteratorCallback,
} from '@microsoft/microsoft-graph-client';
import { Event } from '@microsoft/microsoft-graph-types';
import { formatMicrosoftCalendarEvents } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/utils/format-microsoft-calendar-event.util';
import { parseMicrosoftCalendarError } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/utils/parse-microsoft-calendar-error.util';
import { GetCalendarEventsResponse } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service';
import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service';
@@ -31,61 +29,34 @@ export class MicrosoftCalendarGetEventsService {
await this.microsoftOAuth2ClientManagerService.getOAuth2Client(
connectedAccount.refreshToken,
);
const eventIds: string[] = [];
const { changedEvents, nextSyncToken } =
await this.getChangedCalendarEventIds(connectedAccount, syncCursor);
const response: PageCollection = await microsoftClient
.api(syncCursor || '/me/calendar/events/delta')
.version('beta')
.get();
const events: Event[] = [];
const callback: PageIteratorCallback = (data) => {
eventIds.push(data.id);
for (const changedEvent of changedEvents) {
const event = await microsoftClient
.api(`/me/calendar/events/${changedEvent.id}`)
.get();
return true;
};
events.push(event);
}
const pageIterator = new PageIterator(
microsoftClient,
response,
callback,
);
await pageIterator.iterate();
return {
calendarEvents: formatMicrosoftCalendarEvents(events),
nextSyncCursor: nextSyncToken || '',
fullEvents: false,
calendarEventIds: eventIds,
nextSyncCursor: pageIterator.getDeltaLink() || '',
};
} catch (error) {
throw parseMicrosoftCalendarError(error);
}
}
private async getChangedCalendarEventIds(
connectedAccount: Pick<
ConnectedAccountWorkspaceEntity,
'provider' | 'refreshToken' | 'id'
>,
syncCursor?: string,
): Promise<{
changedEvents: Pick<Event, 'id'>[];
nextSyncToken?: string;
}> {
const microsoftClient =
await this.microsoftOAuth2ClientManagerService.getOAuth2Client(
connectedAccount.refreshToken,
);
const events: Event[] = [];
const response: PageCollection = await microsoftClient
.api(syncCursor || '/me/calendar/events/delta')
.version('beta')
.get();
const callback: PageIteratorCallback = (data) => {
events.push(data);
return true;
};
const pageIterator = new PageIterator(microsoftClient, response, callback);
await pageIterator.iterate();
return {
changedEvents: events,
nextSyncToken: pageIterator.getDeltaLink(),
};
}
}

View File

@@ -0,0 +1,45 @@
import { Injectable } from '@nestjs/common';
import { Event } from '@microsoft/microsoft-graph-types';
import { formatMicrosoftCalendarEvents } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/utils/format-microsoft-calendar-event.util';
import { parseMicrosoftCalendarError } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/utils/parse-microsoft-calendar-error.util';
import { CalendarEventWithParticipants } from 'src/modules/calendar/common/types/calendar-event';
import { MicrosoftOAuth2ClientManagerService } from 'src/modules/connected-account/oauth2-client-manager/drivers/microsoft/microsoft-oauth2-client-manager.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
@Injectable()
export class MicrosoftCalendarImportEventsService {
constructor(
private readonly microsoftOAuth2ClientManagerService: MicrosoftOAuth2ClientManagerService,
) {}
public async getCalendarEvents(
connectedAccount: Pick<
ConnectedAccountWorkspaceEntity,
'provider' | 'refreshToken' | 'id'
>,
changedEventIds: string[],
): Promise<CalendarEventWithParticipants[]> {
try {
const microsoftClient =
await this.microsoftOAuth2ClientManagerService.getOAuth2Client(
connectedAccount.refreshToken,
);
const events: Event[] = [];
for (const changedEventId of changedEventIds) {
const event = await microsoftClient
.api(`/me/calendar/events/${changedEventId}`)
.get();
events.push(event);
}
return formatMicrosoftCalendarEvents(events);
} catch (error) {
throw parseMicrosoftCalendarError(error);
}
}
}

View File

@@ -5,6 +5,7 @@ import { Processor } from 'src/engine/core-modules/message-queue/decorators/proc
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service';
import { CalendarFetchEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-fetch-events.service';
import {
CalendarChannelSyncStage,
CalendarChannelWorkspaceEntity,
@@ -23,7 +24,7 @@ export type CalendarEventsImportJobData = {
export class CalendarEventListFetchJob {
constructor(
private readonly twentyORMManager: TwentyORMManager,
private readonly calendarEventsImportService: CalendarEventsImportService,
private readonly calendarFetchEventsService: CalendarFetchEventsService,
) {}
@Process(CalendarEventListFetchJob.name)
@@ -65,7 +66,7 @@ export class CalendarEventListFetchJob {
syncStageStartedAt: null,
});
await this.calendarEventsImportService.processCalendarEventsImport(
await this.calendarFetchEventsService.fetchCalendarEvents(
calendarChannel,
calendarChannel.connectedAccount,
workspaceId,
@@ -73,7 +74,7 @@ export class CalendarEventListFetchJob {
break;
case CalendarChannelSyncStage.PARTIAL_CALENDAR_EVENT_LIST_FETCH_PENDING:
await this.calendarEventsImportService.processCalendarEventsImport(
await this.calendarFetchEventsService.fetchCalendarEvents(
calendarChannel,
calendarChannel.connectedAccount,
workspaceId,

View File

@@ -0,0 +1,75 @@
import { Scope } from '@nestjs/common';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service';
import {
CalendarChannelSyncStage,
CalendarChannelWorkspaceEntity,
} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
import { isThrottled } from 'src/modules/connected-account/utils/is-throttled';
export type CalendarEventsImportJobData = {
calendarChannelId: string;
workspaceId: string;
};
@Processor({
queueName: MessageQueue.calendarQueue,
scope: Scope.REQUEST,
})
export class CalendarImportJob {
constructor(
private readonly calendarEventsImportService: CalendarEventsImportService,
private readonly twentyORMManager: TwentyORMManager,
) {}
@Process(CalendarImportJob.name)
async handle(data: CalendarEventsImportJobData): Promise<void> {
console.time('CalendarEventsImportJob time');
const { calendarChannelId, workspaceId } = data;
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
'calendarChannel',
);
const calendarChannel = await calendarChannelRepository.findOne({
where: {
id: calendarChannelId,
isSyncEnabled: true,
},
relations: ['connectedAccount'],
});
if (!calendarChannel?.isSyncEnabled) {
return;
}
if (
isThrottled(
calendarChannel.syncStageStartedAt,
calendarChannel.throttleFailureCount,
)
) {
return;
}
if (
calendarChannel.syncStage !==
CalendarChannelSyncStage.CALENDAR_EVENTS_IMPORT_PENDING
) {
return;
}
await this.calendarEventsImportService.processCalendarEventsImport(
calendarChannel,
calendarChannel.connectedAccount,
workspaceId,
);
console.timeEnd('CalendarEventsImportJob time');
}
}

View File

@@ -2,84 +2,86 @@ import { Injectable } from '@nestjs/common';
import { Any } from 'typeorm';
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { BlocklistRepository } from 'src/modules/blocklist/repositories/blocklist.repository';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { CalendarEventCleanerService } from 'src/modules/calendar/calendar-event-cleaner/services/calendar-event-cleaner.service';
import { CALENDAR_EVENT_IMPORT_BATCH_SIZE } from 'src/modules/calendar/calendar-event-import-manager/constants/calendar-event-import-batch-size';
import { MicrosoftCalendarImportEventsService } from 'src/modules/calendar/calendar-event-import-manager/drivers/microsoft-calendar/services/microsoft-calendar-import-events.service';
import {
CalendarEventImportErrorHandlerService,
CalendarEventImportSyncStep,
} from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service';
import {
CalendarGetCalendarEventsService,
GetCalendarEventsResponse,
} from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service';
import { CalendarSaveEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-save-events.service';
import { filterEventsAndReturnCancelledEvents } from 'src/modules/calendar/calendar-event-import-manager/utils/filter-events.util';
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service';
import { CalendarChannelEventAssociationWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel-event-association.workspace-entity';
import {
CalendarChannelSyncStage,
CalendarChannelWorkspaceEntity,
} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
import { CalendarChannelWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
import { CalendarEventWithParticipants } from 'src/modules/calendar/common/types/calendar-event';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
@Injectable()
export class CalendarEventsImportService {
constructor(
@InjectCacheStorage(CacheStorageNamespace.ModuleCalendar)
private readonly cacheStorage: CacheStorageService,
private readonly twentyORMManager: TwentyORMManager,
@InjectObjectMetadataRepository(BlocklistWorkspaceEntity)
private readonly blocklistRepository: BlocklistRepository,
private readonly calendarEventCleanerService: CalendarEventCleanerService,
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
private readonly getCalendarEventsService: CalendarGetCalendarEventsService,
private readonly calendarSaveEventsService: CalendarSaveEventsService,
private readonly calendarEventImportErrorHandlerService: CalendarEventImportErrorHandlerService,
private readonly microsoftCalendarImportEventService: MicrosoftCalendarImportEventsService,
) {}
public async processCalendarEventsImport(
calendarChannel: CalendarChannelWorkspaceEntity,
connectedAccount: ConnectedAccountWorkspaceEntity,
workspaceId: string,
fetchedCalendarEvents?: CalendarEventWithParticipants[],
): Promise<void> {
const syncStep =
calendarChannel.syncStage ===
CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING
? CalendarEventImportSyncStep.FULL_CALENDAR_EVENT_LIST_FETCH
: CalendarEventImportSyncStep.PARTIAL_CALENDAR_EVENT_LIST_FETCH;
await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchOngoing(
await this.calendarChannelSyncStatusService.markAsCalendarEventsImportOngoing(
[calendarChannel.id],
);
let calendarEvents: GetCalendarEventsResponse['calendarEvents'] = [];
let nextSyncCursor: GetCalendarEventsResponse['nextSyncCursor'] = '';
let calendarEvents: CalendarEventWithParticipants[] = [];
try {
const getCalendarEventsResponse =
await this.getCalendarEventsService.getCalendarEvents(
connectedAccount,
calendarChannel.syncCursor,
if (fetchedCalendarEvents) {
calendarEvents = fetchedCalendarEvents;
} else {
const eventIdsToFetch: string[] = await this.cacheStorage.setPop(
`calendar-events-to-import:${workspaceId}:${calendarChannel.id}`,
CALENDAR_EVENT_IMPORT_BATCH_SIZE,
);
calendarEvents = getCalendarEventsResponse.calendarEvents;
nextSyncCursor = getCalendarEventsResponse.nextSyncCursor;
if (!eventIdsToFetch || eventIdsToFetch.length === 0) {
await this.calendarChannelSyncStatusService.markAsCompletedAndSchedulePartialCalendarEventListFetch(
[calendarChannel.id],
);
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
'calendarChannel',
);
return;
}
switch (connectedAccount.provider) {
case 'microsoft':
calendarEvents =
await this.microsoftCalendarImportEventService.getCalendarEvents(
connectedAccount,
eventIdsToFetch,
);
break;
default:
break;
}
}
if (!calendarEvents || calendarEvents?.length === 0) {
await calendarChannelRepository.update(
{
id: calendarChannel.id,
},
{
syncCursor: nextSyncCursor,
},
);
await this.calendarChannelSyncStatusService.schedulePartialCalendarEventListFetch(
[calendarChannel.id],
);
@@ -127,22 +129,13 @@ export class CalendarEventsImportService {
workspaceId,
);
await calendarChannelRepository.update(
{
id: calendarChannel.id,
},
{
syncCursor: nextSyncCursor,
},
);
await this.calendarChannelSyncStatusService.markAsCompletedAndSchedulePartialCalendarEventListFetch(
[calendarChannel.id],
);
} catch (error) {
await this.calendarEventImportErrorHandlerService.handleDriverException(
error,
syncStep,
CalendarEventImportSyncStep.CALENDAR_EVENTS_IMPORT,
calendarChannel,
workspaceId,
);

View File

@@ -0,0 +1,144 @@
import { Injectable } from '@nestjs/common';
import { InjectCacheStorage } from 'src/engine/core-modules/cache-storage/decorators/cache-storage.decorator';
import { CacheStorageService } from 'src/engine/core-modules/cache-storage/services/cache-storage.service';
import { CacheStorageNamespace } from 'src/engine/core-modules/cache-storage/types/cache-storage-namespace.enum';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import {
CalendarEventImportDriverException,
CalendarEventImportDriverExceptionCode,
} from 'src/modules/calendar/calendar-event-import-manager/drivers/exceptions/calendar-event-import-driver.exception';
import { CalendarEventsImportJobData } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-event-list-fetch.job';
import { CalendarImportJob } from 'src/modules/calendar/calendar-event-import-manager/jobs/calendar-import.job';
import {
CalendarEventImportErrorHandlerService,
CalendarEventImportSyncStep,
} from 'src/modules/calendar/calendar-event-import-manager/services/calendar-event-import-exception-handler.service';
import { CalendarEventsImportService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-events-import.service';
import { CalendarGetCalendarEventsService } from 'src/modules/calendar/calendar-event-import-manager/services/calendar-get-events.service';
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service';
import {
CalendarChannelSyncStage,
CalendarChannelWorkspaceEntity,
} from 'src/modules/calendar/common/standard-objects/calendar-channel.workspace-entity';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
@Injectable()
export class CalendarFetchEventsService {
constructor(
@InjectCacheStorage(CacheStorageNamespace.ModuleCalendar)
private readonly cacheStorage: CacheStorageService,
private readonly twentyORMManager: TwentyORMManager,
private readonly calendarChannelSyncStatusService: CalendarChannelSyncStatusService,
private readonly getCalendarEventsService: CalendarGetCalendarEventsService,
private readonly calendarEventImportErrorHandlerService: CalendarEventImportErrorHandlerService,
private readonly calendarEventsImportService: CalendarEventsImportService,
@InjectMessageQueue(MessageQueue.messagingQueue)
private readonly messageQueueService: MessageQueueService,
) {}
public async fetchCalendarEvents(
calendarChannel: CalendarChannelWorkspaceEntity,
connectedAccount: ConnectedAccountWorkspaceEntity,
workspaceId: string,
): Promise<void> {
const syncStep =
calendarChannel.syncStage ===
CalendarChannelSyncStage.FULL_CALENDAR_EVENT_LIST_FETCH_PENDING
? CalendarEventImportSyncStep.FULL_CALENDAR_EVENT_LIST_FETCH
: CalendarEventImportSyncStep.PARTIAL_CALENDAR_EVENT_LIST_FETCH;
await this.calendarChannelSyncStatusService.markAsCalendarEventListFetchOngoing(
[calendarChannel.id],
);
try {
const getCalendarEventsResponse =
await this.getCalendarEventsService.getCalendarEvents(
connectedAccount,
calendarChannel.syncCursor,
);
const hasFullEvents = getCalendarEventsResponse.fullEvents;
const calendarEvents = hasFullEvents
? getCalendarEventsResponse.calendarEvents
: null;
const calendarEventIds = getCalendarEventsResponse.calendarEventIds;
const nextSyncCursor = getCalendarEventsResponse.nextSyncCursor;
const calendarChannelRepository =
await this.twentyORMManager.getRepository<CalendarChannelWorkspaceEntity>(
'calendarChannel',
);
if (!calendarEvents || calendarEvents?.length === 0) {
await calendarChannelRepository.update(
{
id: calendarChannel.id,
},
{
syncCursor: nextSyncCursor,
},
);
await this.calendarChannelSyncStatusService.schedulePartialCalendarEventListFetch(
[calendarChannel.id],
);
}
await calendarChannelRepository.update(
{
id: calendarChannel.id,
},
{
syncCursor: nextSyncCursor,
},
);
if (hasFullEvents && calendarEvents) {
// Event Import already done
this.calendarEventsImportService.processCalendarEventsImport(
calendarChannel,
connectedAccount,
workspaceId,
calendarEvents,
);
} else if (!hasFullEvents && calendarEventIds) {
// Event Import still needed
await this.cacheStorage.setAdd(
`calendar-events-to-import:${workspaceId}:${calendarChannel.id}`,
calendarEventIds,
);
await this.calendarChannelSyncStatusService.scheduleCalendarEventsImport(
[calendarChannel.id],
);
this.calendarEventsImportService.processCalendarEventsImport(
calendarChannel,
connectedAccount,
workspaceId,
);
} else {
throw new CalendarEventImportDriverException(
"Expected 'calendarEvents' or 'calendarEventIds' to be present",
CalendarEventImportDriverExceptionCode.UNKNOWN,
);
}
} catch (error) {
await this.calendarEventImportErrorHandlerService.handleDriverException(
error,
syncStep,
calendarChannel,
workspaceId,
);
}
}
}

View File

@@ -10,7 +10,9 @@ import { CalendarEventWithParticipants } from 'src/modules/calendar/common/types
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
export type GetCalendarEventsResponse = {
calendarEvents: CalendarEventWithParticipants[];
fullEvents: boolean;
calendarEvents?: CalendarEventWithParticipants[];
calendarEventIds?: string[];
nextSyncCursor: string;
};