Skip to main content

@tuturuuu/trigger

The @tuturuuu/trigger package provides background job processing capabilities using Trigger.dev, enabling long-running tasks like calendar synchronization, batch operations, and scheduled workflows.

Installation

# Already included in monorepo workspace
import { googleCalendarSync } from '@tuturuuu/trigger';

Available Jobs

Google Calendar Sync

The platform provides three types of calendar synchronization jobs:

1. Full Sync

Complete synchronization of all calendar events.
// packages/trigger/google-calendar-full-sync.ts
import { task } from '@trigger.dev/sdk/v3';

export const googleCalendarFullSync = task({
  id: 'google-calendar-full-sync',
  run: async (payload: { wsId: string; userId: string }) => {
    // Implementation
  },
});
Use Cases:
  • Initial calendar setup
  • Recovery from sync errors
  • Manual refresh requested by user
Trigger:
import { googleCalendarFullSync } from '@tuturuuu/trigger';

await googleCalendarFullSync.trigger({
  wsId: 'workspace-id',
  userId: 'user-id',
});

2. Incremental Sync

Efficient delta synchronization using Google’s sync tokens.
// packages/trigger/google-calendar-incremental-sync.ts
import { task } from '@trigger.dev/sdk/v3';

export const googleCalendarIncrementalSync = task({
  id: 'google-calendar-incremental-sync',
  run: async (payload: { wsId: string; userId: string }) => {
    // Implementation using sync tokens
  },
});
How It Works:
  1. Retrieves last sync token from calendar_sync_states
  2. Fetches only changed events since last sync
  3. Updates database with changes
  4. Stores new sync token for next incremental sync
Use Cases:
  • Regular background sync (every 5-15 minutes)
  • Webhook-triggered updates
  • Real-time calendar updates
Trigger:
import { googleCalendarIncrementalSync } from '@tuturuuu/trigger';

await googleCalendarIncrementalSync.trigger({
  wsId: 'workspace-id',
  userId: 'user-id',
});

3. Batched Sync

Batch synchronization for multiple workspaces.
// packages/trigger/google-calendar-sync.ts
import { task } from '@trigger.dev/sdk/v3';

export const googleCalendarBatchSync = task({
  id: 'google-calendar-batch-sync',
  run: async (payload: { workspaces: Array<{ wsId: string; userId: string }> }) => {
    // Batch processing implementation
  },
});
Use Cases:
  • Scheduled background sync for all workspaces
  • System-wide calendar refresh
  • Admin-initiated bulk sync
Trigger:
import { googleCalendarBatchSync } from '@tuturuuu/trigger';

await googleCalendarBatchSync.trigger({
  workspaces: [
    { wsId: 'ws-1', userId: 'user-1' },
    { wsId: 'ws-2', userId: 'user-2' },
  ],
});

Task Scheduling

Schedule tasks based on due dates and priorities.
// packages/trigger/schedule-tasks.ts
import { task } from '@trigger.dev/sdk/v3';

export const scheduleTasks = task({
  id: 'schedule-tasks',
  run: async (payload: { wsId: string }) => {
    // Auto-schedule tasks into calendar
  },
});
Features:
  • Analyzes task due dates and priorities
  • Finds available time slots in calendar
  • Creates calendar events for high-priority tasks
  • Respects work hours and existing commitments
Trigger:
import { scheduleTasks } from '@tuturuuu/trigger';

await scheduleTasks.trigger({
  wsId: 'workspace-id',
});

Calendar Sync Implementation

Sync Coordination

The package uses atomic sync state management to prevent concurrent syncs:
import { createClient } from '@tuturuuu/supabase/server';

async function acquireSyncLock(wsId: string): Promise<boolean> {
  const supabase = createClient();

  // Check if already syncing
  const { data: state } = await supabase
    .from('calendar_sync_states')
    .select('is_syncing')
    .eq('ws_id', wsId)
    .single();

  if (state?.is_syncing) {
    return false; // Another sync in progress
  }

  // Acquire lock
  const { error } = await supabase
    .from('calendar_sync_states')
    .update({ is_syncing: true })
    .eq('ws_id', wsId);

  return !error;
}

async function releaseSyncLock(wsId: string) {
  const supabase = createClient();

  await supabase
    .from('calendar_sync_states')
    .update({ is_syncing: false })
    .eq('ws_id', wsId);
}

Full Sync Flow

import { task } from '@trigger.dev/sdk/v3';
import { google } from 'googleapis';
import { createAdminClient } from '@tuturuuu/supabase/server';

export const googleCalendarFullSync = task({
  id: 'google-calendar-full-sync',
  maxDuration: 300, // 5 minutes
  run: async (payload: { wsId: string; userId: string }) => {
    const supabase = createAdminClient();

    // Acquire sync lock
    const lockAcquired = await acquireSyncLock(payload.wsId);
    if (!lockAcquired) {
      throw new Error('Sync already in progress');
    }

    try {
      // Get OAuth tokens
      const { data: tokens } = await supabase
        .from('calendar_auth_tokens')
        .select('*')
        .eq('user_id', payload.userId)
        .single();

      if (!tokens) throw new Error('No calendar tokens found');

      // Initialize Google Calendar API
      const oauth2Client = new google.auth.OAuth2();
      oauth2Client.setCredentials({
        access_token: tokens.access_token,
        refresh_token: tokens.refresh_token,
      });

      const calendar = google.calendar({ version: 'v3', auth: oauth2Client });

      // Fetch all events
      const response = await calendar.events.list({
        calendarId: 'primary',
        maxResults: 2500,
        singleEvents: true,
        orderBy: 'startTime',
      });

      const events = response.data.items || [];

      // Log sync start
      const { data: syncLog } = await supabase
        .from('workspace_calendar_sync_log')
        .insert({
          ws_id: payload.wsId,
          sync_type: 'full',
          started_at: new Date().toISOString(),
        })
        .select()
        .single();

      // Batch upsert events
      for (let i = 0; i < events.length; i += 100) {
        const batch = events.slice(i, i + 100);

        await supabase
          .from('workspace_calendar_events')
          .upsert(
            batch.map((event) => ({
              ws_id: payload.wsId,
              google_event_id: event.id,
              title: event.summary,
              description: event.description,
              start_at: event.start?.dateTime || event.start?.date,
              end_at: event.end?.dateTime || event.end?.date,
              location: event.location,
              creator_id: payload.userId,
            })),
            {
              onConflict: 'google_event_id',
            }
          );
      }

      // Update sync token
      await supabase
        .from('calendar_sync_states')
        .upsert({
          ws_id: payload.wsId,
          sync_token: response.data.nextSyncToken,
          last_synced_at: new Date().toISOString(),
        });

      // Log sync completion
      await supabase
        .from('workspace_calendar_sync_log')
        .update({
          completed_at: new Date().toISOString(),
        })
        .eq('id', syncLog?.id);

      return {
        success: true,
        eventsProcessed: events.length,
      };
    } catch (error) {
      // Log error
      await supabase
        .from('workspace_calendar_sync_log')
        .update({
          completed_at: new Date().toISOString(),
          error: (error as Error).message,
        });

      throw error;
    } finally {
      // Always release lock
      await releaseSyncLock(payload.wsId);
    }
  },
});

Incremental Sync Flow

import { task } from '@trigger.dev/sdk/v3';
import { google } from 'googleapis';
import { createAdminClient } from '@tuturuuu/supabase/server';

export const googleCalendarIncrementalSync = task({
  id: 'google-calendar-incremental-sync',
  maxDuration: 60, // 1 minute
  run: async (payload: { wsId: string; userId: string }) => {
    const supabase = createAdminClient();

    const lockAcquired = await acquireSyncLock(payload.wsId);
    if (!lockAcquired) return { skipped: true };

    try {
      // Get sync token
      const { data: syncState } = await supabase
        .from('calendar_sync_states')
        .select('sync_token')
        .eq('ws_id', payload.wsId)
        .single();

      if (!syncState?.sync_token) {
        // No sync token - trigger full sync instead
        await googleCalendarFullSync.trigger(payload);
        return { redirectedToFullSync: true };
      }

      // Get OAuth tokens
      const { data: tokens } = await supabase
        .from('calendar_auth_tokens')
        .select('*')
        .eq('user_id', payload.userId)
        .single();

      if (!tokens) throw new Error('No calendar tokens found');

      const oauth2Client = new google.auth.OAuth2();
      oauth2Client.setCredentials({
        access_token: tokens.access_token,
        refresh_token: tokens.refresh_token,
      });

      const calendar = google.calendar({ version: 'v3', auth: oauth2Client });

      // Fetch changes since last sync
      const response = await calendar.events.list({
        calendarId: 'primary',
        syncToken: syncState.sync_token,
      });

      const events = response.data.items || [];

      // Process changes
      for (const event of events) {
        if (event.status === 'cancelled') {
          // Delete event
          await supabase
            .from('workspace_calendar_events')
            .delete()
            .eq('google_event_id', event.id);
        } else {
          // Upsert event
          await supabase
            .from('workspace_calendar_events')
            .upsert({
              ws_id: payload.wsId,
              google_event_id: event.id,
              title: event.summary,
              description: event.description,
              start_at: event.start?.dateTime || event.start?.date,
              end_at: event.end?.dateTime || event.end?.date,
              location: event.location,
              creator_id: payload.userId,
            });
        }
      }

      // Update sync token
      await supabase
        .from('calendar_sync_states')
        .update({
          sync_token: response.data.nextSyncToken,
          last_synced_at: new Date().toISOString(),
        })
        .eq('ws_id', payload.wsId);

      return {
        success: true,
        changesProcessed: events.length,
      };
    } catch (error) {
      if ((error as any).code === 410) {
        // Sync token expired - trigger full sync
        await googleCalendarFullSync.trigger(payload);
        return { syncTokenExpired: true };
      }
      throw error;
    } finally {
      await releaseSyncLock(payload.wsId);
    }
  },
});

Scheduled Jobs

Daily Calendar Sync

// apps/web/src/trigger/scheduled.ts
import { schedules } from '@trigger.dev/sdk/v3';
import { googleCalendarIncrementalSync } from '@tuturuuu/trigger';
import { createAdminClient } from '@tuturuuu/supabase/server';

export const dailyCalendarSync = schedules.task({
  id: 'daily-calendar-sync',
  cron: '0 */6 * * *', // Every 6 hours
  run: async () => {
    const supabase = createAdminClient();

    // Get all workspaces with calendar enabled
    const { data: workspaces } = await supabase
      .from('workspace_secrets')
      .select('ws_id, user_id')
      .eq('name', 'ENABLE_CALENDAR')
      .eq('value', 'true');

    if (!workspaces) return;

    // Trigger incremental sync for each workspace
    for (const ws of workspaces) {
      await googleCalendarIncrementalSync.trigger({
        wsId: ws.ws_id,
        userId: ws.user_id,
      });
    }

    return { workspacesProcessed: workspaces.length };
  },
});

Development

Local Development

# Start Trigger.dev dev server
bun trigger:dev
This starts a local Trigger.dev instance that watches for job changes and allows testing.

Testing Jobs Locally

import { googleCalendarFullSync } from '@tuturuuu/trigger';

// Trigger a test run
const run = await googleCalendarFullSync.trigger({
  wsId: 'test-workspace',
  userId: 'test-user',
});

console.log('Run ID:', run.id);

Deployment

Deploy Jobs to Production

bun trigger:deploy
This deploys all background jobs to Trigger.dev cloud.

Environment Variables

Required in .env:
TRIGGER_SECRET_KEY=your_trigger_secret_key

Monitoring

View Job Runs

Access the Trigger.dev dashboard to monitor:
  • Job execution history
  • Success/failure rates
  • Execution duration
  • Error logs
  • Retry attempts

Error Handling

export const myJob = task({
  id: 'my-job',
  retry: {
    maxAttempts: 3,
    minTimeoutInMs: 1000,
    maxTimeoutInMs: 10000,
    factor: 2,
  },
  run: async (payload) => {
    // Job implementation
  },
});

Best Practices

✅ DO

  1. Use appropriate sync types
    // Initial setup: Full sync
    await googleCalendarFullSync.trigger({ wsId, userId });
    
    // Regular updates: Incremental sync
    await googleCalendarIncrementalSync.trigger({ wsId, userId });
    
  2. Implement sync locks
    const lockAcquired = await acquireSyncLock(wsId);
    if (!lockAcquired) return { skipped: true };
    
  3. Log sync operations
    await supabase.from('workspace_calendar_sync_log').insert({
      ws_id: wsId,
      sync_type: 'incremental',
      started_at: new Date().toISOString(),
    });
    
  4. Set appropriate max durations
    maxDuration: 300, // 5 minutes for full sync
    maxDuration: 60,  // 1 minute for incremental sync
    
  5. Handle token expiration
    if (error.code === 410) {
      // Trigger full sync to get new token
    }
    

❌ DON’T

  1. Don’t skip lock acquisition
    // ❌ Bad: Concurrent syncs can corrupt data
    
  2. Don’t use user client for background jobs
    // ❌ Bad
    const supabase = createClient();
    
    // ✅ Good
    const supabase = createAdminClient();
    
  3. Don’t ignore sync errors
    // ❌ Bad: Silent failure
    // ✅ Good: Log to sync_log table
    

Future Jobs

Potential background jobs to implement:
  • Email processing and AI summarization
  • Batch task creation from templates
  • Automated report generation
  • Data export and backup
  • Workspace analytics calculation