diff --git a/packages/twenty-front/src/testing/mock-data/generated/standard-metadata-query-result.ts b/packages/twenty-front/src/testing/mock-data/generated/standard-metadata-query-result.ts index 4446ad4f5..b0e7590a9 100644 --- a/packages/twenty-front/src/testing/mock-data/generated/standard-metadata-query-result.ts +++ b/packages/twenty-front/src/testing/mock-data/generated/standard-metadata-query-result.ts @@ -6903,9 +6903,9 @@ export const mockedStandardObjectMetadataQueryResult: ObjectMetadataItemsQuery = options: null, id: '24147b01-4394-4aee-92a4-5f6b5073704f', type: 'DATE_TIME', - name: 'ongoingSyncStartedAt', - label: 'Ongoing sync started at', - description: 'Ongoing sync started at', + name: 'syncStageStartedAt', + label: 'Sync stage started at', + description: 'Sync stage started at', icon: 'IconHistory', isCustom: false, isActive: true, diff --git a/packages/twenty-server/src/database/typeorm-seeds/workspace/message-channels.ts b/packages/twenty-server/src/database/typeorm-seeds/workspace/message-channels.ts index 167061d11..2c1f4f399 100644 --- a/packages/twenty-server/src/database/typeorm-seeds/workspace/message-channels.ts +++ b/packages/twenty-server/src/database/typeorm-seeds/workspace/message-channels.ts @@ -1,7 +1,7 @@ import { EntityManager } from 'typeorm'; import { DEV_SEED_CONNECTED_ACCOUNT_IDS } from 'src/database/typeorm-seeds/workspace/connected-account'; -import { MessageChannelSyncSubStatus } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; +import { MessageChannelSyncStage } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; const tableName = 'messageChannel'; @@ -28,7 +28,7 @@ export const seedMessageChannel = async ( 'connectedAccountId', 'handle', 'visibility', - 'syncSubStatus', + 'syncStage', ]) .orIgnore() .values([ @@ -42,8 +42,7 @@ export const seedMessageChannel = async ( connectedAccountId: DEV_SEED_CONNECTED_ACCOUNT_IDS.TIM, handle: 'tim@apple.dev', visibility: 'share_everything', - syncSubStatus: - MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING, + syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, }, { id: DEV_SEED_MESSAGE_CHANNEL_IDS.JONY, @@ -55,8 +54,7 @@ export const seedMessageChannel = async ( connectedAccountId: DEV_SEED_CONNECTED_ACCOUNT_IDS.JONY, handle: 'jony.ive@apple.dev', visibility: 'share_everything', - syncSubStatus: - MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING, + syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, }, { id: DEV_SEED_MESSAGE_CHANNEL_IDS.PHIL, @@ -68,8 +66,7 @@ export const seedMessageChannel = async ( connectedAccountId: DEV_SEED_CONNECTED_ACCOUNT_IDS.PHIL, handle: 'phil.schiler@apple.dev', visibility: 'share_everything', - syncSubStatus: - MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING, + syncStage: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, }, ]) .execute(); diff --git a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts index 0acd8fe31..21910d0a7 100644 --- a/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts +++ b/packages/twenty-server/src/engine/workspace-manager/workspace-sync-metadata/constants/standard-field-ids.ts @@ -70,7 +70,6 @@ export const CALENDAR_CHANNEL_STANDARD_FIELD_IDS = { isSyncEnabled: '20202020-fe19-4818-8854-21f7b1b43395', syncCursor: '20202020-bac2-4852-a5cb-7a7898992b70', calendarChannelEventAssociations: '20202020-afb0-4a9f-979f-2d5087d71d09', - throttlePauseUntil: '20202020-16e8-40ca-be79-a3af4787af2c', throttleFailureCount: '20202020-525c-4b76-b9bd-0dd57fd11d61', }; @@ -208,9 +207,8 @@ export const MESSAGE_CHANNEL_STANDARD_FIELD_IDS = { syncCursor: '20202020-79d1-41cf-b738-bcf5ed61e256', syncedAt: '20202020-263d-4c6b-ad51-137ada56f7d4', syncStatus: '20202020-56a1-4f7e-9880-a8493bb899cc', - syncSubStatus: '20202020-7979-4b08-89fe-99cb5e698767', - ongoingSyncStartedAt: '20202020-8c61-4a42-ae63-73c1c3c52e06', - throttlePauseUntil: '20202020-a8cb-475b-868c-b83538614df4', + syncStage: '20202020-7979-4b08-89fe-99cb5e698767', + syncStageStartedAt: '20202020-8c61-4a42-ae63-73c1c3c52e06', throttleFailureCount: '20202020-0291-42be-9ad0-d578a51684ab', }; diff --git a/packages/twenty-server/src/modules/calendar/standard-objects/calendar-channel.workspace-entity.ts b/packages/twenty-server/src/modules/calendar/standard-objects/calendar-channel.workspace-entity.ts index 0bf11a6ae..85aed3b43 100644 --- a/packages/twenty-server/src/modules/calendar/standard-objects/calendar-channel.workspace-entity.ts +++ b/packages/twenty-server/src/modules/calendar/standard-objects/calendar-channel.workspace-entity.ts @@ -15,7 +15,6 @@ import { WorkspaceIsSystem } from 'src/engine/twenty-orm/decorators/workspace-is import { WorkspaceIsNotAuditLogged } from 'src/engine/twenty-orm/decorators/workspace-is-not-audit-logged.decorator'; import { WorkspaceField } from 'src/engine/twenty-orm/decorators/workspace-field.decorator'; import { WorkspaceRelation } from 'src/engine/twenty-orm/decorators/workspace-relation.decorator'; -import { WorkspaceIsNullable } from 'src/engine/twenty-orm/decorators/workspace-is-nullable.decorator'; export enum CalendarChannelVisibility { METADATA = 'METADATA', @@ -97,16 +96,6 @@ export class CalendarChannelWorkspaceEntity extends BaseWorkspaceEntity { }) syncCursor: string; - @WorkspaceField({ - standardId: CALENDAR_CHANNEL_STANDARD_FIELD_IDS.throttlePauseUntil, - type: FieldMetadataType.DATE_TIME, - label: 'Throttle Pause Until', - description: 'Throttle Pause Until', - icon: 'IconPlayerPause', - }) - @WorkspaceIsNullable() - throttlePauseUntil: Date; - @WorkspaceField({ standardId: CALENDAR_CHANNEL_STANDARD_FIELD_IDS.throttleFailureCount, type: FieldMetadataType.NUMBER, diff --git a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts index a6b2f9585..fa229445b 100644 --- a/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts +++ b/packages/twenty-server/src/modules/messaging/common/repositories/message-channel.repository.ts @@ -7,7 +7,7 @@ import { ObjectRecord } from 'src/engine/workspace-manager/workspace-sync-metada import { MessageChannelWorkspaceEntity, MessageChannelSyncStatus, - MessageChannelSyncSubStatus, + MessageChannelSyncStage, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @Injectable() @@ -51,7 +51,7 @@ export class MessageChannelRepository { this.workspaceDataSourceService.getSchemaName(workspaceId); await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "syncStatus" = NULL, "syncCursor" = '', "ongoingSyncStartedAt" = NULL + `UPDATE ${dataSourceSchema}."messageChannel" SET "syncStatus" = NULL, "syncCursor" = '', "syncStageStartedAt" = NULL WHERE "connectedAccountId" = $1`, [connectedAccountId], workspaceId, @@ -169,18 +169,11 @@ export class MessageChannelRepository { this.workspaceDataSourceService.getSchemaName(workspaceId); const needsToUpdateSyncedAt = - syncStatus === MessageChannelSyncStatus.SUCCEEDED; - - const needsToUpdateOngoingSyncStartedAt = - syncStatus === MessageChannelSyncStatus.ONGOING; + syncStatus === MessageChannelSyncStatus.COMPLETED; await this.workspaceDataSourceService.executeRawQuery( `UPDATE ${dataSourceSchema}."messageChannel" SET "syncStatus" = $1 ${ needsToUpdateSyncedAt ? `, "syncedAt" = NOW()` : '' - } ${ - needsToUpdateOngoingSyncStartedAt - ? `, "ongoingSyncStartedAt" = NOW()` - : `, "ongoingSyncStartedAt" = NULL` } WHERE "id" = $2`, [syncStatus, id], workspaceId, @@ -188,9 +181,31 @@ export class MessageChannelRepository { ); } - public async updateSyncSubStatus( + public async updateSyncStage( + id: string, + syncStage: MessageChannelSyncStage, + workspaceId: string, + transactionManager?: EntityManager, + ): Promise { + const dataSourceSchema = + this.workspaceDataSourceService.getSchemaName(workspaceId); + + const needsToUpdateSyncStageStartedAt = + syncStage === MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING || + syncStage === MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING; + + await this.workspaceDataSourceService.executeRawQuery( + `UPDATE ${dataSourceSchema}."messageChannel" SET "syncStage" = $1 ${ + needsToUpdateSyncStageStartedAt ? `, "syncStageStartedAt" = NOW()` : '' + } WHERE "id" = $2`, + [syncStage, id], + workspaceId, + transactionManager, + ); + } + + public async resetSyncStageStartedAt( id: string, - syncSubStatus: MessageChannelSyncSubStatus, workspaceId: string, transactionManager?: EntityManager, ): Promise { @@ -198,8 +213,8 @@ export class MessageChannelRepository { this.workspaceDataSourceService.getSchemaName(workspaceId); await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "syncSubStatus" = $1 WHERE "id" = $2`, - [syncSubStatus, id], + `UPDATE ${dataSourceSchema}."messageChannel" SET "syncStageStartedAt" = NULL WHERE "id" = $1`, + [id], workspaceId, transactionManager, ); @@ -241,9 +256,8 @@ export class MessageChannelRepository { ); } - public async updateThrottlePauseUntilAndIncrementThrottleFailureCount( + public async incrementThrottleFailureCount( id: string, - throttleDurationMs: number, workspaceId: string, transactionManager?: EntityManager, ) { @@ -251,15 +265,15 @@ export class MessageChannelRepository { this.workspaceDataSourceService.getSchemaName(workspaceId); await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "throttlePauseUntil" = NOW() + ($1 || ' milliseconds')::interval, "throttleFailureCount" = "throttleFailureCount" + 1 - WHERE "id" = $2`, - [throttleDurationMs, id], + `UPDATE ${dataSourceSchema}."messageChannel" SET "throttleFailureCount" = "throttleFailureCount" + 1 + WHERE "id" = $1`, + [id], workspaceId, transactionManager, ); } - public async resetThrottlePauseUntilAndThrottleFailureCount( + public async resetThrottleFailureCount( id: string, workspaceId: string, transactionManager?: EntityManager, @@ -268,7 +282,7 @@ export class MessageChannelRepository { this.workspaceDataSourceService.getSchemaName(workspaceId); await this.workspaceDataSourceService.executeRawQuery( - `UPDATE ${dataSourceSchema}."messageChannel" SET "throttlePauseUntil" = NULL, "throttleFailureCount" = 0 + `UPDATE ${dataSourceSchema}."messageChannel" SET "throttleFailureCount" = 0 WHERE "id" = $1`, [id], workspaceId, diff --git a/packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts b/packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts index 1e1884576..bf472695a 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/messaging-channel-sync-status.service.ts @@ -7,7 +7,7 @@ import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repos import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessageChannelWorkspaceEntity, - MessageChannelSyncSubStatus, + MessageChannelSyncStage, MessageChannelSyncStatus, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; @@ -24,9 +24,9 @@ export class MessagingChannelSyncStatusService { messageChannelId: string, workspaceId: string, ) { - await this.messageChannelRepository.updateSyncSubStatus( + await this.messageChannelRepository.updateSyncStage( messageChannelId, - MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING, + MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, workspaceId, ); } @@ -35,9 +35,9 @@ export class MessagingChannelSyncStatusService { messageChannelId: string, workspaceId: string, ) { - await this.messageChannelRepository.updateSyncSubStatus( + await this.messageChannelRepository.updateSyncStage( messageChannelId, - MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING, + MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, workspaceId, ); } @@ -46,9 +46,9 @@ export class MessagingChannelSyncStatusService { messageChannelId: string, workspaceId: string, ) { - await this.messageChannelRepository.updateSyncSubStatus( + await this.messageChannelRepository.updateSyncStage( messageChannelId, - MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING, + MessageChannelSyncStage.MESSAGES_IMPORT_PENDING, workspaceId, ); } @@ -68,6 +68,16 @@ export class MessagingChannelSyncStatusService { workspaceId, ); + await this.messageChannelRepository.resetSyncStageStartedAt( + messageChannelId, + workspaceId, + ); + + await this.messageChannelRepository.resetThrottleFailureCount( + messageChannelId, + workspaceId, + ); + await this.scheduleFullMessageListFetch(messageChannelId, workspaceId); } @@ -75,9 +85,9 @@ export class MessagingChannelSyncStatusService { messageChannelId: string, workspaceId: string, ) { - await this.messageChannelRepository.updateSyncSubStatus( + await this.messageChannelRepository.updateSyncStage( messageChannelId, - MessageChannelSyncSubStatus.MESSAGE_LIST_FETCH_ONGOING, + MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING, workspaceId, ); @@ -105,9 +115,9 @@ export class MessagingChannelSyncStatusService { messageChannelId: string, workspaceId: string, ) { - await this.messageChannelRepository.updateSyncSubStatus( + await this.messageChannelRepository.updateSyncStage( messageChannelId, - MessageChannelSyncSubStatus.MESSAGES_IMPORT_ONGOING, + MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING, workspaceId, ); } @@ -120,9 +130,9 @@ export class MessagingChannelSyncStatusService { `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, ); - await this.messageChannelRepository.updateSyncSubStatus( + await this.messageChannelRepository.updateSyncStage( messageChannelId, - MessageChannelSyncSubStatus.FAILED, + MessageChannelSyncStage.FAILED, workspaceId, ); @@ -141,9 +151,9 @@ export class MessagingChannelSyncStatusService { `messages-to-import:${workspaceId}:gmail:${messageChannelId}`, ); - await this.messageChannelRepository.updateSyncSubStatus( + await this.messageChannelRepository.updateSyncStage( messageChannelId, - MessageChannelSyncSubStatus.FAILED, + MessageChannelSyncStage.FAILED, workspaceId, ); diff --git a/packages/twenty-server/src/modules/messaging/common/services/messaging-error-handling.service.ts b/packages/twenty-server/src/modules/messaging/common/services/messaging-error-handling.service.ts index b79fbfa38..13e9ca586 100644 --- a/packages/twenty-server/src/modules/messaging/common/services/messaging-error-handling.service.ts +++ b/packages/twenty-server/src/modules/messaging/common/services/messaging-error-handling.service.ts @@ -10,7 +10,6 @@ import { MessagingTelemetryService } from 'src/modules/messaging/common/services import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingChannelSyncStatusService } from 'src/modules/messaging/common/services/messaging-channel-sync-status.service'; import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; -import { MESSAGING_THROTTLE_DURATION } from 'src/modules/messaging/common/constants/messaging-throttle-duration'; import { MESSAGING_THROTTLE_MAX_ATTEMPTS } from 'src/modules/messaging/common/constants/messaging-throttle-max-attempts'; type SyncStep = @@ -212,13 +211,8 @@ export class MessagingErrorHandlingService { messageChannel: ObjectRecord, workspaceId: string, ): Promise { - const throttleDuration = - MESSAGING_THROTTLE_DURATION * - Math.pow(2, messageChannel.throttleFailureCount); - - await this.messageChannelRepository.updateThrottlePauseUntilAndIncrementThrottleFailureCount( + await this.messageChannelRepository.incrementThrottleFailureCount( messageChannel.id, - throttleDuration, workspaceId, ); @@ -227,7 +221,7 @@ export class MessagingErrorHandlingService { workspaceId, connectedAccountId: messageChannel.connectedAccountId, messageChannelId: messageChannel.id, - message: `Throttling for ${throttleDuration}ms`, + message: `Increment throttle failure count to ${messageChannel.throttleFailureCount}`, }); } } diff --git a/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts b/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts index ae24cc602..8f0081d30 100644 --- a/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts +++ b/packages/twenty-server/src/modules/messaging/common/standard-objects/message-channel.workspace-entity.ts @@ -31,7 +31,7 @@ export enum MessageChannelSyncStatus { FAILED_UNKNOWN = 'FAILED_UNKNOWN', } -export enum MessageChannelSyncSubStatus { +export enum MessageChannelSyncStage { FULL_MESSAGE_LIST_FETCH_PENDING = 'FULL_MESSAGE_LIST_FETCH_PENDING', PARTIAL_MESSAGE_LIST_FETCH_PENDING = 'PARTIAL_MESSAGE_LIST_FETCH_PENDING', MESSAGE_LIST_FETCH_ONGOING = 'MESSAGE_LIST_FETCH_ONGOING', @@ -227,72 +227,62 @@ export class MessageChannelWorkspaceEntity extends BaseWorkspaceEntity { syncStatus: MessageChannelSyncStatus; @WorkspaceField({ - standardId: MESSAGE_CHANNEL_STANDARD_FIELD_IDS.syncSubStatus, + standardId: MESSAGE_CHANNEL_STANDARD_FIELD_IDS.syncStage, type: FieldMetadataType.SELECT, - label: 'Sync sub status', - description: 'Sync sub status', + label: 'Sync stage', + description: 'Sync stage', icon: 'IconStatusChange', options: [ { - value: MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING, + value: MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING, label: 'Full messages list fetch pending', position: 0, color: 'blue', }, { - value: MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING, + value: MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING, label: 'Partial messages list fetch pending', position: 1, color: 'blue', }, { - value: MessageChannelSyncSubStatus.MESSAGE_LIST_FETCH_ONGOING, + value: MessageChannelSyncStage.MESSAGE_LIST_FETCH_ONGOING, label: 'Messages list fetch ongoing', position: 2, color: 'orange', }, { - value: MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING, + value: MessageChannelSyncStage.MESSAGES_IMPORT_PENDING, label: 'Messages import pending', position: 3, color: 'blue', }, { - value: MessageChannelSyncSubStatus.MESSAGES_IMPORT_ONGOING, + value: MessageChannelSyncStage.MESSAGES_IMPORT_ONGOING, label: 'Messages import ongoing', position: 4, color: 'orange', }, { - value: MessageChannelSyncSubStatus.FAILED, + value: MessageChannelSyncStage.FAILED, label: 'Failed', position: 5, color: 'red', }, ], - defaultValue: `'${MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING}'`, + defaultValue: `'${MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING}'`, }) - syncSubStatus: MessageChannelSyncSubStatus; + syncStage: MessageChannelSyncStage; @WorkspaceField({ - standardId: MESSAGE_CHANNEL_STANDARD_FIELD_IDS.ongoingSyncStartedAt, + standardId: MESSAGE_CHANNEL_STANDARD_FIELD_IDS.syncStageStartedAt, type: FieldMetadataType.DATE_TIME, - label: 'Ongoing sync started at', - description: 'Ongoing sync started at', + label: 'Sync stage started at', + description: 'Sync stage started at', icon: 'IconHistory', }) @WorkspaceIsNullable() - ongoingSyncStartedAt: string; - - @WorkspaceField({ - standardId: MESSAGE_CHANNEL_STANDARD_FIELD_IDS.throttlePauseUntil, - type: FieldMetadataType.DATE_TIME, - label: 'Throttle Pause Until', - description: 'Throttle Pause Until', - icon: 'IconPlayerPause', - }) - @WorkspaceIsNullable() - throttlePauseUntil: Date; + syncStageStartedAt: string; @WorkspaceField({ standardId: MESSAGE_CHANNEL_STANDARD_FIELD_IDS.throttleFailureCount, diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-full-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-full-message-list-fetch.service.ts index 07e2c1eef..f5bff6f03 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-full-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-full-message-list-fetch.service.ts @@ -77,7 +77,12 @@ export class MessagingGmailFullMessageListFetchService { return; } - await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount( + await this.messageChannelRepository.resetThrottleFailureCount( + messageChannel.id, + workspaceId, + ); + + await this.messageChannelRepository.resetSyncStageStartedAt( messageChannel.id, workspaceId, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service.ts index a00590b05..2f49ded66 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service.ts @@ -12,7 +12,7 @@ import { BlocklistRepository } from 'src/modules/connected-account/repositories/ import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service'; import { MessageChannelWorkspaceEntity, - MessageChannelSyncSubStatus, + MessageChannelSyncStage, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { createQueriesFromMessageIds } from 'src/modules/messaging/message-import-manager/utils/create-queries-from-message-ids.util'; import { filterEmails } from 'src/modules/messaging/message-import-manager/utils/filter-emails.util'; @@ -50,8 +50,8 @@ export class MessagingGmailMessagesImportService { workspaceId: string, ) { if ( - messageChannel.syncSubStatus !== - MessageChannelSyncSubStatus.MESSAGES_IMPORT_PENDING + messageChannel.syncStage !== + MessageChannelSyncStage.MESSAGES_IMPORT_PENDING ) { return; } @@ -137,7 +137,12 @@ export class MessagingGmailMessagesImportService { ); } - await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount( + await this.messageChannelRepository.resetThrottleFailureCount( + messageChannel.id, + workspaceId, + ); + + await this.messageChannelRepository.resetSyncStageStartedAt( messageChannel.id, workspaceId, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-partial-message-list-fetch.service.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-partial-message-list-fetch.service.ts index 36e86080f..654150d5b 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-partial-message-list-fetch.service.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-partial-message-list-fetch.service.ts @@ -74,7 +74,12 @@ export class MessagingGmailPartialMessageListFetchService { return; } - await this.messageChannelRepository.resetThrottlePauseUntilAndThrottleFailureCount( + await this.messageChannelRepository.resetThrottleFailureCount( + messageChannel.id, + workspaceId, + ); + + await this.messageChannelRepository.resetSyncStageStartedAt( messageChannel.id, workspaceId, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/is-throttled.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/is-throttled.ts new file mode 100644 index 000000000..d437d91ac --- /dev/null +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/drivers/gmail/utils/is-throttled.ts @@ -0,0 +1,25 @@ +import { MESSAGING_THROTTLE_DURATION } from 'src/modules/messaging/common/constants/messaging-throttle-duration'; + +export const isThrottled = ( + syncStageStartedAt: string | null, + throttleFailureCount: number, +): boolean => { + if (!syncStageStartedAt) { + return false; + } + + return ( + computeThrottlePauseUntil(syncStageStartedAt, throttleFailureCount) > + new Date() + ); +}; + +const computeThrottlePauseUntil = ( + syncStageStartedAt: string, + throttleFailureCount: number, +): Date => { + return new Date( + new Date(syncStageStartedAt).getTime() + + MESSAGING_THROTTLE_DURATION * Math.pow(2, throttleFailureCount - 1), + ); +}; diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts index 13e4fe715..a39672d20 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-message-list-fetch.job.ts @@ -8,11 +8,12 @@ import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/s import { MessageChannelRepository } from 'src/modules/messaging/common/repositories/message-channel.repository'; import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service'; import { - MessageChannelSyncSubStatus, + MessageChannelSyncStage, MessageChannelWorkspaceEntity, } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingGmailFullMessageListFetchService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-full-message-list-fetch.service'; import { MessagingGmailPartialMessageListFetchService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-partial-message-list-fetch.service'; +import { isThrottled } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/is-throttled'; export type MessagingMessageListFetchJobData = { workspaceId: string; @@ -76,14 +77,16 @@ export class MessagingMessageListFetchJob } if ( - messageChannel.throttlePauseUntil && - messageChannel.throttlePauseUntil > new Date() + isThrottled( + messageChannel.syncStageStartedAt, + messageChannel.throttleFailureCount, + ) ) { return; } - switch (messageChannel.syncSubStatus) { - case MessageChannelSyncSubStatus.PARTIAL_MESSAGE_LIST_FETCH_PENDING: + switch (messageChannel.syncStage) { + case MessageChannelSyncStage.PARTIAL_MESSAGE_LIST_FETCH_PENDING: this.logger.log( `Fetching partial message list for workspace ${workspaceId} and account ${connectedAccount.id}`, ); @@ -110,7 +113,7 @@ export class MessagingMessageListFetchJob break; - case MessageChannelSyncSubStatus.FULL_MESSAGE_LIST_FETCH_PENDING: + case MessageChannelSyncStage.FULL_MESSAGE_LIST_FETCH_PENDING: this.logger.log( `Fetching full message list for workspace ${workspaceId} and account ${connectedAccount.id}`, ); diff --git a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts index 139a0b667..757fbbf41 100644 --- a/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts +++ b/packages/twenty-server/src/modules/messaging/message-import-manager/jobs/messaging-messages-import.job.ts @@ -9,6 +9,7 @@ import { MessageChannelRepository } from 'src/modules/messaging/common/repositor import { MessagingTelemetryService } from 'src/modules/messaging/common/services/messaging-telemetry.service'; import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity'; import { MessagingGmailMessagesImportService } from 'src/modules/messaging/message-import-manager/drivers/gmail/services/messaging-gmail-messages-import.service'; +import { isThrottled } from 'src/modules/messaging/message-import-manager/drivers/gmail/utils/is-throttled'; export type MessagingMessagesImportJobData = { workspaceId: string; @@ -46,8 +47,10 @@ export class MessagingMessagesImportJob }); if ( - messageChannel.throttlePauseUntil && - messageChannel.throttlePauseUntil > new Date() + isThrottled( + messageChannel.syncStageStartedAt, + messageChannel.throttleFailureCount, + ) ) { continue; }