Supabase 완전 정복 시리즈 12편 — Analytics Buckets & Logs: 대용량 분석 워크로드




시리즈 목차 1편 – Supabase란 무엇인가? Firebase와 제대로 비교해보기 … 10편 – MCP 서버 연동 — AI 에이전트와 Supabase 연결하기 11편 – 로컬 개발 환경 & CLI — 프로 개발자의 워크플로우 12편 👉 Analytics Buckets & Logs — 대용량 분석 워크로드 (현재 글) 13편 – 실전: 실시간 채팅 앱 …


들어가며

서비스가 성장하면 반드시 맞닥뜨리는 문제가 있습니다.

  • “우리 앱에서 가장 많이 호출되는 API는 무엇인가?”
  • “지난 3개월간 에러율 추이를 보고 싶다”
  • “사용자 행동 이벤트를 수억 건 저장하고 빠르게 집계하고 싶다”
  • “Postgres 쿼리 중 느린 것을 찾아내고 싶다”

이런 분석(Analytics) 워크로드는 일반 Postgres 테이블과 근본적으로 다른 특성을 가집니다. 수억 건 데이터에 대한 집계·스캔 쿼리는 OLTP(트랜잭션 처리) 데이터베이스의 성능을 크게 저하시킵니다.

Supabase는 이 문제를 두 가지 도구로 해결합니다.

  1. Analytics Buckets (2025년 Public Alpha): Apache Iceberg 기반 컬럼 지향 스토리지
  2. Logs Explorer: Logflare 기반 서비스 로그 조회 및 분석

이번 편에서 다룰 내용:

  • OLTP vs OLAP — 왜 분석에 별도 스토리지가 필요한가
  • Analytics Buckets 개념 및 Apache Iceberg
  • Analytics Bucket 생성 및 데이터 적재
  • Postgres에서 Iceberg 쿼리하기
  • Postgres → Iceberg 실시간 데이터 동기화 (CDC Replication)
  • Logs Explorer 완벽 활용 (edge_logs, postgres_logs, function_logs 등)
  • Next.js에서 로그 분석 대시보드 구축
  • OpenTelemetry 연동

OLTP vs OLAP — 왜 분리해야 하는가

OLTP (Online Transaction Processing)     OLAP (Online Analytical Processing)
─────────────────────────────────────    ─────────────────────────────────────
행(Row) 단위 처리                          열(Column) 단위 처리
빠른 단건 조회/삽입/수정                    대량 집계·스캔
인덱스 중심                               압축 + 컬럼 인코딩
짧은 트랜잭션                             긴 쿼리 (분 단위)
Postgres, MySQL 등                       Parquet, Iceberg, DuckDB 등

일반적인 이벤트 로그 시나리오:

-- ❌ Postgres에서 1억 건 집계 → 수십 초, 프로덕션 부하
SELECT
  DATE_TRUNC('day', created_at) AS day,
  event_type,
  COUNT(*) AS cnt
FROM user_events
WHERE created_at >= NOW() - INTERVAL '90 days'
GROUP BY 1, 2
ORDER BY 1, 3 DESC;

-- ✅ Analytics Bucket (Iceberg/Parquet) → 컬럼만 읽어서 빠름
-- 같은 쿼리가 초 단위로 처리됨

Analytics Buckets란?

Analytics Buckets는 대규모 데이터셋에서 분석 워크플로우를 실행하면서도 기본 데이터베이스를 트랜잭션 작업에 최적화된 상태로 유지할 수 있게 합니다.

Apache Iceberg

Analytics Buckets는 대규모 분석 데이터셋의 효율적 관리를 위해 특별히 설계된 오픈 테이블 포맷인 Apache Iceberg를 활용합니다.

Apache Iceberg의 핵심 특징:

  • 컬럼 지향 Parquet 파일: 필요한 컬럼만 읽어 I/O 최소화
  • 스키마 진화(Schema Evolution): 기존 데이터 영향 없이 컬럼 추가/변경
  • 타임 트래블(Time Travel): 과거 시점의 데이터 조회
  • 파티셔닝: 날짜·카테고리별 파티션으로 스캔 범위 축소
  • ACID 보장: 대용량 데이터도 안전한 읽기/쓰기

⚠️ Public Alpha 안내: Analytics Buckets는 2025년 12월 기준 Public Alpha 상태입니다. 빠른 변경 및 Breaking Change 가능성이 있으므로 프로덕션 핵심 경로에는 주의가 필요합니다.


Analytics Bucket 생성

대시보드에서 생성

1. Supabase 대시보드 → Storage
2. "Create Bucket" 클릭
3. 버킷 이름 입력 (예: user-events)
4. Bucket Type → "Analytics Bucket" 선택
5. Create

SDK로 생성

import { createClient } from '@supabase/supabase-js'

const supabase = createClient(
  process.env.NEXT_PUBLIC_SUPABASE_URL!,
  process.env.SUPABASE_SERVICE_ROLE_KEY!
)

// Analytics Bucket 생성
const { data, error } = await supabase.storage.createBucket('user-events', {
  public: false,
  // @ts-ignore — Analytics Bucket은 아직 타입 정의 진행 중
  bucketType: 'ANALYTICS',
})

데이터 적재 (Ingestion)

Analytics Bucket은 Apache Iceberg REST Catalog API를 통해 데이터를 적재합니다. PyIceberg, Apache Spark, DuckDB 등 Iceberg 호환 클라이언트를 사용합니다.

Python (PyIceberg)

# pip install pyiceberg[s3]

from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.schema import Schema
from pyiceberg.types import (
    NestedField, StringType, TimestampType, LongType, DoubleType
)
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
import pyarrow as pa
from datetime import datetime

# Iceberg REST Catalog 연결
catalog = RestCatalog(
    name="supabase",
    **{
        "uri": f"{SUPABASE_URL}/storage/v1/iceberg",
        "header.Authorization": f"Bearer {SUPABASE_SERVICE_ROLE_KEY}",
        "warehouse": "user-events",  # 버킷 이름
    }
)

# 스키마 정의
schema = Schema(
    NestedField(1, "event_id",   StringType(),    required=True),
    NestedField(2, "user_id",    StringType(),    required=True),
    NestedField(3, "event_type", StringType(),    required=True),
    NestedField(4, "properties", StringType(),    required=False),  # JSON string
    NestedField(5, "created_at", TimestampType(), required=True),
    NestedField(6, "session_id", StringType(),    required=False),
    NestedField(7, "revenue",    DoubleType(),    required=False),
)

# 날짜 파티셔닝 (일별 파티션)
partition_spec = PartitionSpec(
    PartitionField(
        source_id=5,  # created_at
        field_id=1000,
        transform=DayTransform(),
        name="created_at_day"
    )
)

# 테이블 생성
table = catalog.create_table(
    identifier="analytics.user_events",
    schema=schema,
    partition_spec=partition_spec,
)

# 데이터 삽입
import pyarrow as pa
from datetime import datetime, timezone

records = pa.table({
    "event_id":   ["evt_001", "evt_002", "evt_003"],
    "user_id":    ["user_A", "user_B", "user_A"],
    "event_type": ["page_view", "click", "purchase"],
    "properties": ['{"page":"/home"}', '{"btn":"signup"}', '{"item":"pro_plan"}'],
    "created_at": [
        datetime(2025, 3, 1, 10, 0, tzinfo=timezone.utc),
        datetime(2025, 3, 1, 10, 5, tzinfo=timezone.utc),
        datetime(2025, 3, 1, 10, 8, tzinfo=timezone.utc),
    ],
    "session_id": ["sess_X", "sess_Y", "sess_X"],
    "revenue":    [None, None, 29.0],
})

table.append(records)
print("✅ 데이터 적재 완료")

Edge Function으로 실시간 이벤트 수집

앱에서 발생하는 이벤트를 실시간으로 Analytics Bucket에 기록하는 패턴입니다.

// supabase/functions/track-event/index.ts
import { createClient } from 'jsr:@supabase/supabase-js@2'

interface TrackEventPayload {
  event_type: string
  properties?: Record<string, unknown>
  session_id?: string
  revenue?: number
}

Deno.serve(async (req: Request) => {
  const supabase = createClient(
    Deno.env.get('SUPABASE_URL')!,
    Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
  )

  const user = await supabase.auth.getUser(
    req.headers.get('Authorization')?.replace('Bearer ', '') ?? ''
  )

  const payload: TrackEventPayload = await req.json()

  // Postgres events 테이블에 먼저 저장 (즉시 조회용)
  await supabase.from('events_buffer').insert({
    event_id:   crypto.randomUUID(),
    user_id:    user.data.user?.id ?? 'anonymous',
    event_type: payload.event_type,
    properties: payload.properties ?? {},
    session_id: payload.session_id,
    revenue:    payload.revenue,
    created_at: new Date().toISOString(),
  })

  return Response.json({ ok: true })
})
// Next.js 클라이언트에서 이벤트 트래킹
// src/lib/analytics.ts
import { createClient } from '@/lib/supabase/client'

export async function track(
  eventType: string,
  properties?: Record<string, unknown>,
  revenue?: number
) {
  const supabase = createClient()
  const { data: { session } } = await supabase.auth.getSession()

  await supabase.functions.invoke('track-event', {
    body: {
      event_type: eventType,
      properties,
      revenue,
      session_id: getSessionId(),
    },
  })
}

// 사용 예시
await track('page_view', { page: '/pricing' })
await track('purchase', { plan: 'pro' }, 29.0)
await track('button_click', { button: 'signup_cta' })

Postgres에서 Iceberg 쿼리하기

Analytics Bucket 데이터를 Postgres SQL로 직접 조회할 수 있습니다.

-- Analytics Bucket을 Foreign Table로 마운트
-- (대시보드 또는 마이그레이션에서 실행)
SELECT storage.mount_analytics_bucket('user-events');

-- 이제 일반 SQL로 쿼리 가능
SELECT
  event_type,
  COUNT(*) AS total,
  COUNT(DISTINCT user_id) AS unique_users
FROM analytics.user_events
WHERE created_at >= NOW() - INTERVAL '30 days'
GROUP BY event_type
ORDER BY total DESC;

-- 일별 매출 집계
SELECT
  DATE_TRUNC('day', created_at) AS day,
  SUM(revenue) AS daily_revenue,
  COUNT(*) AS purchase_count
FROM analytics.user_events
WHERE event_type = 'purchase'
  AND created_at >= NOW() - INTERVAL '90 days'
GROUP BY 1
ORDER BY 1;

-- 퍼널 분석: 랜딩 → 회원가입 → 결제 전환율
WITH funnel AS (
  SELECT
    user_id,
    MAX(CASE WHEN event_type = 'page_view' AND properties->>'page' = '/landing' THEN 1 END) AS saw_landing,
    MAX(CASE WHEN event_type = 'signup' THEN 1 END) AS signed_up,
    MAX(CASE WHEN event_type = 'purchase' THEN 1 END) AS purchased
  FROM analytics.user_events
  WHERE created_at >= NOW() - INTERVAL '30 days'
  GROUP BY user_id
)
SELECT
  COUNT(*) FILTER (WHERE saw_landing = 1) AS step1_landing,
  COUNT(*) FILTER (WHERE saw_landing = 1 AND signed_up = 1) AS step2_signup,
  COUNT(*) FILTER (WHERE saw_landing = 1 AND signed_up = 1 AND purchased = 1) AS step3_purchase,
  ROUND(
    COUNT(*) FILTER (WHERE saw_landing = 1 AND signed_up = 1)::numeric
    / NULLIF(COUNT(*) FILTER (WHERE saw_landing = 1), 0) * 100, 2
  ) AS landing_to_signup_rate,
  ROUND(
    COUNT(*) FILTER (WHERE signed_up = 1 AND purchased = 1)::numeric
    / NULLIF(COUNT(*) FILTER (WHERE signed_up = 1), 0) * 100, 2
  ) AS signup_to_purchase_rate
FROM funnel;

Postgres → Iceberg CDC 실시간 복제

Supabase는 Postgres에서 외부 대상(Iceberg부터 시작)으로 데이터를 지속적으로 복제하는 CDC(Change Data Capture) 파이프라인을 제공합니다(현재 Private Alpha).

이를 통해 Postgres 테이블 변경 내용이 자동으로 Analytics Bucket에 동기화됩니다.

-- Replication 설정 (대시보드 Storage → Analytics → Replication)
-- 또는 SQL로 직접 설정

-- 복제할 테이블 publication 등록
ALTER PUBLICATION supabase_realtime ADD TABLE orders;
ALTER PUBLICATION supabase_realtime ADD TABLE user_events;

CDC 복제가 설정되면:

Postgres 테이블 INSERT/UPDATE/DELETE
          ↓
  Change Data Capture
          ↓
  Analytics Bucket (Iceberg)
          ↓
  SQL로 조회 가능 (분 단위 지연)

Logs Explorer 완벽 활용

Supabase는 모든 서비스 로그를 Logflare 기반으로 수집하며, API 게이트웨이, Postgres 데이터베이스, Storage, Edge Functions 등 다양한 서비스의 로그 이벤트를 추적·분석합니다.

로그 소스 종류

Logs Explorer는 각 로그 소스를 별도 테이블로 노출합니다: auth_logs(인증 활동), edge_logs(엣지 네트워크 요청/응답), function_edge_logs(Edge Function 네트워크 로그), function_logs(Edge Function 내부 로그), postgres_logs(DB 쿼리), realtime_logs(Realtime 연결), storage_logs(스토리지 업로드/조회).

edge_logs — API 요청 분석

-- 최근 1시간 API 에러 TOP 10
SELECT
  DATETIME(timestamp) AS time,
  event_message,
  status_code,
  request_url
FROM
  edge_logs
  CROSS JOIN UNNEST(metadata) AS metadata
  CROSS JOIN UNNEST(metadata.response) AS response
  CROSS JOIN UNNEST(metadata.request) AS request
WHERE
  timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
  AND status_code >= 400
ORDER BY timestamp DESC
LIMIT 100;

-- 엔드포인트별 호출 수 (최근 24시간)
SELECT
  REGEXP_EXTRACT(request_url, r'/rest/v1/([^?]+)') AS table_name,
  COUNT(*) AS request_count,
  COUNTIF(status_code >= 400) AS error_count,
  ROUND(AVG(response_time_ms), 2) AS avg_response_ms
FROM
  edge_logs
  CROSS JOIN UNNEST(metadata) AS metadata
  CROSS JOIN UNNEST(metadata.response) AS response
  CROSS JOIN UNNEST(metadata.request) AS request
WHERE
  timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY table_name
ORDER BY request_count DESC;

postgres_logs — 느린 쿼리 찾기

-- 실행 시간이 1초 이상인 쿼리
SELECT
  DATETIME(timestamp) AS time,
  event_message,
  duration_ms,
  query
FROM
  postgres_logs
  CROSS JOIN UNNEST(metadata) AS metadata
  CROSS JOIN UNNEST(metadata.parsed) AS parsed
WHERE
  timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
  AND duration_ms > 1000
ORDER BY duration_ms DESC
LIMIT 50;

-- 에러 메시지 TOP 10
SELECT
  error_severity,
  REGEXP_EXTRACT(event_message, r'^[^:]+') AS error_type,
  COUNT(*) AS count
FROM postgres_logs
  CROSS JOIN UNNEST(metadata) AS metadata
  CROSS JOIN UNNEST(metadata.parsed) AS parsed
WHERE
  timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
  AND error_severity IN ('ERROR', 'FATAL', 'PANIC')
GROUP BY 1, 2
ORDER BY count DESC
LIMIT 10;

function_logs — Edge Function 디버깅

-- 특정 함수의 로그 조회
SELECT
  DATETIME(timestamp) AS time,
  event_message,
  level
FROM function_logs
WHERE
  timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 6 HOUR)
  AND function_id = 'send-email'
ORDER BY timestamp DESC;

-- 함수별 실행 시간 통계
SELECT
  function_id,
  COUNT(*) AS invocations,
  ROUND(AVG(execution_time_ms), 2) AS avg_ms,
  MAX(execution_time_ms) AS max_ms,
  COUNTIF(status_code >= 400) AS errors
FROM function_edge_logs
  CROSS JOIN UNNEST(metadata) AS metadata
WHERE
  timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY function_id
ORDER BY invocations DESC;

Next.js 로그 분석 대시보드

Logs Explorer 쿼리를 API로 감싸 Next.js 대시보드에서 실시간 모니터링을 구현합니다.

// src/app/api/analytics/api-stats/route.ts
import { createClient } from '@/lib/supabase/server'
import { NextResponse } from 'next/server'

export async function GET(req: Request) {
  const supabase = await createClient()
  const { searchParams } = new URL(req.url)
  const hours = Number(searchParams.get('hours') ?? 24)

  // Supabase 관리 API로 Logs Explorer 쿼리 실행
  const { data, error } = await supabase.rpc('get_api_stats', {
    hours_back: hours,
  })

  if (error) return NextResponse.json({ error: error.message }, { status: 500 })
  return NextResponse.json(data)
}
-- DB 함수로 로그 통계를 Postgres 내에서 집계
-- (postgres_logs는 Postgres 백엔드 사용 시 _analytics 스키마에 존재)
CREATE OR REPLACE FUNCTION get_api_stats(hours_back INT DEFAULT 24)
RETURNS TABLE (
  hour          TIMESTAMPTZ,
  request_count BIGINT,
  error_count   BIGINT,
  avg_duration  FLOAT
) AS $$
BEGIN
  RETURN QUERY
  SELECT
    DATE_TRUNC('hour', created_at) AS hour,
    COUNT(*) AS request_count,
    COUNT(*) FILTER (WHERE status_code >= 400) AS error_count,
    AVG(duration_ms) AS avg_duration
  FROM _analytics.edge_logs_recent  -- 로컬 개발 기준
  WHERE created_at >= NOW() - (hours_back || ' hours')::INTERVAL
  GROUP BY 1
  ORDER BY 1;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
// src/app/(dashboard)/analytics/page.tsx
'use client'
import { useEffect, useState } from 'react'
import {
  LineChart, Line, XAxis, YAxis, CartesianGrid,
  Tooltip, Legend, ResponsiveContainer, BarChart, Bar
} from 'recharts'

interface ApiStat {
  hour: string
  request_count: number
  error_count: number
  avg_duration: number
}

export default function AnalyticsDashboard() {
  const [stats, setStats] = useState<ApiStat[]>([])
  const [loading, setLoading] = useState(true)

  useEffect(() => {
    fetch('/api/analytics/api-stats?hours=24')
      .then(r => r.json())
      .then(data => {
        setStats(data)
        setLoading(false)
      })
  }, [])

  if (loading) return <div className="p-8">로딩 중...</div>

  const totalRequests = stats.reduce((s, d) => s + d.request_count, 0)
  const totalErrors = stats.reduce((s, d) => s + d.error_count, 0)
  const errorRate = totalRequests > 0
    ? ((totalErrors / totalRequests) * 100).toFixed(2)
    : '0'

  return (
    <div className="p-8 space-y-8">
      <h1 className="text-2xl font-bold">API 분석 대시보드</h1>

      {/* KPI 카드 */}
      <div className="grid grid-cols-3 gap-4">
        <div className="bg-white rounded-xl shadow p-6">
          <p className="text-sm text-gray-500">총 요청 수 (24h)</p>
          <p className="text-3xl font-bold">{totalRequests.toLocaleString()}</p>
        </div>
        <div className="bg-white rounded-xl shadow p-6">
          <p className="text-sm text-gray-500">에러 수 (24h)</p>
          <p className="text-3xl font-bold text-red-500">{totalErrors.toLocaleString()}</p>
        </div>
        <div className="bg-white rounded-xl shadow p-6">
          <p className="text-sm text-gray-500">에러율</p>
          <p className="text-3xl font-bold text-orange-500">{errorRate}%</p>
        </div>
      </div>

      {/* 시간대별 요청/에러 추이 */}
      <div className="bg-white rounded-xl shadow p-6">
        <h2 className="text-lg font-semibold mb-4">시간대별 API 요청</h2>
        <ResponsiveContainer width="100%" height={300}>
          <LineChart data={stats}>
            <CartesianGrid strokeDasharray="3 3" />
            <XAxis
              dataKey="hour"
              tickFormatter={v => new Date(v).getHours() + '시'}
            />
            <YAxis />
            <Tooltip
              labelFormatter={v => new Date(v).toLocaleString('ko-KR')}
            />
            <Legend />
            <Line
              type="monotone"
              dataKey="request_count"
              name="요청 수"
              stroke="#3b82f6"
              strokeWidth={2}
            />
            <Line
              type="monotone"
              dataKey="error_count"
              name="에러 수"
              stroke="#ef4444"
              strokeWidth={2}
            />
          </LineChart>
        </ResponsiveContainer>
      </div>

      {/* 평균 응답 시간 */}
      <div className="bg-white rounded-xl shadow p-6">
        <h2 className="text-lg font-semibold mb-4">시간대별 평균 응답 시간 (ms)</h2>
        <ResponsiveContainer width="100%" height={250}>
          <BarChart data={stats}>
            <CartesianGrid strokeDasharray="3 3" />
            <XAxis
              dataKey="hour"
              tickFormatter={v => new Date(v).getHours() + '시'}
            />
            <YAxis />
            <Tooltip
              formatter={(v: number) => [`${v.toFixed(1)}ms`, '평균 응답 시간']}
            />
            <Bar dataKey="avg_duration" name="평균 응답(ms)" fill="#8b5cf6" />
          </BarChart>
        </ResponsiveContainer>
      </div>
    </div>
  )
}

OpenTelemetry 연동

OpenTelemetry 통합을 통해 로그, 메트릭, 트레이스를 Datadog, Honeycomb, Grafana 등 OTel 호환 도구로 내보낼 수 있으며, Metrics API는 CPU, IO, WAL, 연결 수, 쿼리 통계를 포함한 약 200개의 Prometheus 호환 Postgres 메트릭을 제공합니다.

# Grafana + Prometheus 연동 예시 (docker-compose.yml)
services:
  prometheus:
    image: prom/prometheus
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  grafana:
    image: grafana/grafana
    ports:
      - "3001:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
# prometheus.yml
scrape_configs:
  - job_name: supabase
    static_configs:
      - targets: ['api.supabase.com']
    metrics_path: /customer/v1/privileged/metrics
    authorization:
      credentials: ${SUPABASE_SERVICE_ROLE_KEY}
    scheme: https

언제 무엇을 사용할까?

상황권장 도구
서비스 로그 즉시 조회Logs Explorer
느린 쿼리 디버깅Logs Explorer → postgres_logs
Edge Function 에러 추적Logs Explorer → function_logs
사용자 이벤트 대용량 저장Analytics Bucket
BI 도구 연동 (Metabase, Tableau)Analytics Bucket → Iceberg
Postgres 데이터 분석 복제CDC Replication → Analytics Bucket
인프라 모니터링OpenTelemetry + Grafana
앱 내 실시간 KPIPostgres 집계 뷰 + Realtime

마치며

Analytics Buckets와 Logs Explorer는 Supabase가 단순 BaaS를 넘어 데이터 플랫폼으로 진화하고 있음을 보여줍니다. Apache Iceberg 기반의 컬럼 지향 스토리지로 수억 건 이벤트를 저렴하게 보관하고, SQL로 직접 분석하는 경험은 기존에 별도 데이터 웨어하우스가 필요했던 작업을 Supabase 하나로 해결해 줍니다.

다음 편부터는 실전 프로젝트 시리즈가 시작됩니다. 13편에서는 Realtime, Auth, RLS를 모두 활용한 실시간 채팅 앱을 Next.js App Router로 처음부터 끝까지 구축합니다.




댓글 남기기