Change Stream
본 문서는 Juju 2.9, 3.6, 4.0 코드베이스 분석을 기반으로 AI가 정리했습니다.
Overview
Change Stream은 데이터베이스 변경을 감지하여 관심 있는 워커에게 실시간으로 전달하는 Juju의 핵심 인프라다. juju status가 보여주는 상태 변화, charm hook 트리거, 리더십 전환 등 Juju의 모든 반응형 동작이 이 시스템 위에서 동작한다.
버전별로 근본적인 아키텍처 변화를 거쳤다:
| Version | Backend | 변경 감지 방식 | 지연 시간 |
|---|---|---|---|
| 2.9 | MongoDB | txns.log MongoDB의 mgo/txn 라이브러리가 관리하는 capped collection. 모든 트랜잭션의 변경 기록이 여기에 쌓이며, 크기 제한에 도달하면 오래된 항목이 자동 삭제된다. 폴링 | 10ms ~ 5s |
| 3.6 | MongoDB | Change Streams MongoDB 3.6+에서 도입된 기능. oplog를 추상화하여 컬렉션 변경을 실시간 스트림으로 제공한다. 폴링 없이 이벤트 기반으로 동작하여 지연이 크게 줄었다. | sub-second |
| 4.0 | dqlite | SQL 트리거 테이블에 INSERT/UPDATE/DELETE가 발생하면 자동으로 change_log 테이블에 기록하는 SQL 트리거. 애플리케이션 코드 수정 없이 모든 변경을 캡처한다. + 폴링 | 100ms ~ 10s |
Juju 2.9: MongoDB txns.log 폴링
아키텍처
2.9에서는 MongoDB가 유일한 데이터베이스였다. 변경 감지는 mgo/txn 라이브러리가 관리하는 **txns.log capped collection MongoDB의 고정 크기 컬렉션. 생성 시 최대 크기를 지정하고, 꽉 차면 가장 오래된 문서를 자동으로 덮어쓴다(원형 버퍼). 삽입 순서가 보장되어 별도 인덱스 없이 빠른 순차 읽기가 가능하지만, 개별 문서 삭제나 크기 변경이 불가능하다. **을 역순으로 폴링하는 방식이었다.
flowchart LR
DB["MongoDB"] --> TL["txns.log\n(capped collection)"]
TL -->|"역순 폴링"| TW["TxnWatcher"]
TW -->|"hub.Publish()"| Hub["pubsub.SimpleHub"]
Hub -->|"SubscribeMatch()"| HW1["HubWatcher 1"]
Hub -->|"SubscribeMatch()"| HW2["HubWatcher 2"]
HW1 -->|"채널 전송"| W1["Watcher들"]
HW2 -->|"채널 전송"| W2["Watcher들"]
TxnWatcher: 폴링 메커니즘
파일: state/watcher/txnwatcher.go
TxnWatcher는 controller당 하나만 실행되며, txns.log를 폴링하여 변경을 감지한다.
폴링 전략 (Exponential Backoff):
Initial: 10ms // 변경 감지 직후 빠르게 재폴링
Factor: 1.5x // 변경 없으면 간격 증가
MaxDelay: 5s // 최대 대기 시간
변경이 감지되면 즉시 10ms로 리셋하여 연쇄 변경을 빠르게 포착했다.
변경 감지 알고리즘
txns.log의 각 항목은 트랜잭션에서 변경된 컬렉션과 문서 ID를 기록한다:
{
_id: <transaction_id>,
"machines": {
d: ["machine-0", "machine-1"], // 변경된 문서 ID
r: [3, 5] // txn-revno (-값이면 삭제)
},
"units": {
d: ["mysql/0"],
r: [2]
}
}
sync() 알고리즘:
txns.log를 **역순(-$natural)**으로 순회- 이전에 처리한
lastId에 도달하면 중단 - 같은 문서가 여러 트랜잭션에 나타나면 최신 것만 채택 (중복 제거)
- 각 변경을 Hub Juju 내부에서 사용하는 경량 pub/sub 시스템. TxnWatcher가 변경을 발행하면 각 State 인스턴스의 HubWatcher가 구독하여 수신한다. 로 발행
HubWatcher: 이벤트 분배
파일: state/watcher/hubwatcher.go
Hub를 구독하여 변경 이벤트를 받고, 등록된 Watcher들에게 분배한다:
| Watch 유형 | 설명 |
|---|---|
| Per-Document | 특정 문서 ID의 변경만 수신 |
| Per-Collection | 컬렉션 내 모든 문서 변경 수신 |
| Filtered Collection | 필터 함수를 적용하여 선택적 수신 |
한계
- Capped Collection 오버플로:
txns.log가 가득 차서 읽기 위치를 잃으면 CappedPositionLost MongoDB capped collection에서 커서 위치의 데이터가 새 데이터에 의해 덮어쓰여졌을 때 발생하는 에러(코드 136). TxnWatcher가 이 에러를 받으면 에이전트 전체를 재시작해야 한다. 에러 발생 → 에이전트 전체 재시작 - 폴링 지연: 최대 5초의 감지 지연
- 단일 장애점: TxnWatcher 실패 시 모든 Watcher가 영향을 받음
Juju 3.6: MongoDB Change Streams
아키텍처 변경
3.6에서는 MongoDB의 Change Streams API를 도입하여 폴링을 이벤트 기반으로 교체했다. 동시에 dqlite가 도입되기 시작했지만, 변경 감지는 아직 MongoDB 기반이었다.
flowchart LR
DB["MongoDB"] --> CS["Change Streams\n(oplog 추상화)"]
CS -->|"이벤트 스트림"| TW["TxnWatcher"]
TW -->|"hub.Publish()"| Hub["pubsub.SimpleHub"]
Hub --> HW["HubWatcher"]
HW --> W["Watcher들"]
주요 개선점
| 항목 | 2.9 (폴링) | 3.6 (Change Streams) |
|---|---|---|
| 감지 방식 | capped collection 폴링 | oplog 기반 스트리밍 MongoDB의 oplog(operation log)를 기반으로 동작하는 실시간 변경 스트림. 서버가 변경을 push하므로 클라이언트가 폴링할 필요가 없다. |
| 지연 시간 | 10ms ~ 5s | sub-second |
| 오버플로 위험 | CappedPositionLost 가능 | resumeToken Change Streams에서 커서 위치를 기억하는 토큰. 연결이 끊겨도 이 토큰으로 마지막 위치부터 재개할 수 있어 데이터 유실을 방지한다. 으로 안전한 재개 |
| DB 부하 | 주기적 쿼리 부하 | 이벤트 기반, 유휴 시 부하 없음 |
과도기 구조
3.6은 MongoDB와 dqlite가 공존하는 과도기였다:
- 변경 감지: MongoDB Change Streams (기존)
- Lease 저장: dqlite로 이전 완료
- 상태 데이터: MongoDB에서 dqlite로 이전 중
TxnWatcher → Hub → HubWatcher 파이프라인은 유지되었고, TxnWatcher 내부만 폴링에서 Change Streams로 교체되었다.
Juju 4.0: dqlite Change Stream
아키텍처
4.0에서는 MongoDB가 완전히 제거되고, dqlite 기반의 새로운 Change Stream 시스템이 도입되었다. SQL 트리거로 변경을 캡처하고, 멀티플렉서로 구독자에게 분배하는 구조다.
flowchart TB
subgraph DB["dqlite Database"]
T["테이블 변경\n(INSERT/UPDATE/DELETE)"]
TR["SQL Trigger"]
CL["change_log 테이블"]
T -->|"AFTER trigger"| TR
TR -->|"INSERT"| CL
end
subgraph Worker["changestream Worker"]
S["Stream\n(폴링 루프)"]
EM["EventMultiplexer"]
CL -->|"SELECT WHERE id > ?"| S
S -->|"Term"| EM
end
subgraph Subs["Subscriptions"]
S1["Sub 1\n(machine)"]
S2["Sub 2\n(unit)"]
S3["Sub 3\n(all)"]
EM --> S1
EM --> S2
EM --> S3
end
subgraph Watchers["Watchers"]
NW1["NamespaceWatcher"]
NW2["NotifyWatcher"]
S1 --> NW1
S2 --> NW2
end
1단계: SQL 트리거로 변경 캡처
스키마: domain/schema/controller/sql/0003-changelog.sql
모니터링 대상 테이블마다 3개의 트리거가 자동 생성된다:
-- INSERT 트리거
CREATE TRIGGER trg_log_machine_insert
AFTER INSERT ON machine FOR EACH ROW
BEGIN
INSERT INTO change_log (edit_type_id, namespace_id, changed, created_at)
VALUES (1, 2, NEW.uuid, DATETIME('now', 'utc'));
END;
-- UPDATE 트리거
CREATE TRIGGER trg_log_machine_update
AFTER UPDATE ON machine FOR EACH ROW
BEGIN
INSERT INTO change_log (edit_type_id, namespace_id, changed, created_at)
VALUES (2, 2, OLD.uuid, DATETIME('now', 'utc'));
END;
-- DELETE 트리거
CREATE TRIGGER trg_log_machine_delete
AFTER DELETE ON machine FOR EACH ROW
BEGIN
INSERT INTO change_log (edit_type_id, namespace_id, changed, created_at)
VALUES (4, 2, OLD.uuid, DATETIME('now', 'utc'));
END;
change_log 테이블 구조:
| 컬럼 | 타입 | 설명 |
|---|---|---|
id | INT AUTOINCREMENT | 단조 증가 시퀀스 (순서 보장) |
edit_type_id | INT | 1=CREATE, 2=UPDATE, 4=DELETE (비트 플래그) |
namespace_id | INT | 테이블 이름 (FK → change_log_namespace) |
changed | TEXT | 변경된 행의 PK (예: UUID) |
created_at | DATETIME | UTC 타임스탬프 |
2단계: Stream 폴링
파일: internal/changestream/stream/stream.go
Stream 워커가 주기적으로 change_log를 폴링한다:
SELECT MAX(c.id), c.edit_type_id, n.namespace, c.changed, c.created_at
FROM change_log c
JOIN change_log_edit_type t ON c.edit_type_id = t.id
JOIN change_log_namespace n ON c.namespace_id = n.id
WHERE c.id > ? -- 마지막 처리 위치 이후
GROUP BY c.namespace_id, c.changed -- 같은 엔티티 변경은 합침
ORDER BY c.id;
핵심 설계:
GROUP BY namespace_id, changed: 같은 엔티티에 대한 여러 변경을 하나로 합침 ( coalescing 같은 엔티티에 대해 짧은 시간 내에 여러 변경이 발생하면 하나의 이벤트로 합치는 최적화. 예: 10ms 내에 3번 UPDATE된 machine은 1개의 UPDATE 이벤트만 발생한다. )MAX(c.id): 합쳐진 변경 중 최신 시퀀스 번호를 유지- 폴링 간격: 100ms (변경 있을 때) ~ 10s (변경 없을 때, exponential backoff)
폴링 결과는 Term 하나의 폴링 사이클에서 수집된 변경 이벤트 묶음. Term이 모든 구독자에게 전달 완료되어야 다음 Term을 처리하여 순서를 보장한다. 으로 묶여 EventMultiplexer에 전달된다.
3단계: EventMultiplexer 분배
파일: internal/changestream/eventmultiplexer/eventmultiplexer.go
Term을 받아 관심 있는 구독자에게 병렬 분배한다:
Term 수신
→ gatherSubscriptions(): 각 변경 이벤트에 관심 있는 구독 찾기
1. 필터 없는 구독 (전체 구독) 추가
2. namespace별 구독 중:
- 변경 타입 비트마스크 확인 (CREATE|UPDATE|DELETE)
- 커스텀 필터 함수 실행
→ dispatchSet(): errgroup으로 병렬 전송
- 구독자당 10초 타임아웃
- 타임아웃 시 해당 구독 자동 해제 (느린 구독자 보호)
→ term.Done(): Term 처리 완료 시그널
4단계: Subscription → Watcher
구독 등록 예시:
// machine 테이블의 모든 변경 구독
subscription, _ := watchableDB.Subscribe("machine-watcher",
changestream.Namespace("machine", changestream.All),
)
// 특정 UUID만 필터링
subscription, _ := watchableDB.Subscribe("unit-abc-watcher",
changestream.FilteredNamespace("unit", changestream.Changed,
func(e changestream.ChangeEvent) bool {
return e.Changed() == "unit-abc-uuid"
},
),
)
WatchableDB 일반 DB 접근(TxnRunner)에 Subscribe() 메서드를 추가한 확장 인터페이스. 트랜잭션 실행과 변경 구독을 하나의 객체에서 제공하여 domain service가 쉽게 사용할 수 있다. 는 네임스페이스(모델)별로 독립적이며, 각각 자체 Stream과 EventMultiplexer를 가진다.
5단계: Watcher가 소비
파일: core/watcher/eventsource/namespace.go
NamespaceWatcher.loop():
1. initialQuery 실행 → 현재 상태 조회
2. 초기 상태를 out 채널로 전송
3. 루프:
subscription.Changes() 수신
→ mapper 적용 (UUID 추출, 필터링)
→ out 채널로 전송
→ API facade가 수신하여 클라이언트에 WebSocket 전달
Watermark 기반 Pruning
change_log가 무한히 커지는 것을 방지하기 위해 watermark 각 controller가 어디까지 change_log를 처리했는지 기록하는 표식. HA 환경에서 가장 느린 controller의 watermark 이하의 로그만 안전하게 삭제할 수 있다. 시스템을 사용한다:
change_log_witness 테이블:
| 컬럼 | 설명 |
|---|---|
controller_id | controller 에이전트 ID |
lower_bound | 처리 완료한 가장 낮은 change_log.id |
upper_bound | 처리 완료한 가장 높은 change_log.id |
updated_at | 마지막 업데이트 시각 |
Pruning 흐름:
- 각 Stream이 5초마다 자신의 watermark를 DB에 기록
- Pruner가 모든 controller의
lower_bound중 최솟값을 조회 - 그 값 이하의 change_log 항목을 삭제
HA 환경에서 가장 느린 controller가 아직 처리하지 못한 로그를 삭제하지 않도록 보장한다.
전체 데이터 흐름 예시
시나리오: machine이 추가되고, 두 API 클라이언트가 machine을 watch 중
T0 INSERT INTO machine (uuid='m-1', ...)
→ 트리거: INSERT INTO change_log (id=100, type=CREATE,
ns='machine', changed='m-1')
T1 Stream.readChanges() 폴링 (upperBound=99)
→ SELECT ... FROM change_log WHERE id > 99
→ 결과: [{id:100, type:CREATE, ns:'machine', changed:'m-1'}]
T2 Term 생성, EventMultiplexer로 전송
T3 EventMultiplexer: gatherSubscriptions()
→ sub1: namespace='machine' 매칭 ✓
→ sub2: namespace='machine' 매칭 ✓
T4 dispatchSet(): errgroup 병렬 전송
→ sub1.changes <- [{CREATE, 'machine', 'm-1'}]
→ sub2.changes <- [{CREATE, 'machine', 'm-1'}]
T5 NamespaceWatcher1: mapper 적용 → ['m-1']
→ API client 1에 WebSocket 전송
T6 NamespaceWatcher2: mapper 적용 → ['m-1']
→ API client 2에 WebSocket 전송
버전별 비교
| 항목 | 2.9 | 3.6 | 4.0 |
|---|---|---|---|
| DB | MongoDB only | MongoDB + dqlite | dqlite only |
| 변경 감지 | txns.log capped collection 폴링 | MongoDB Change Streams | SQL 트리거 → change_log 폴링 |
| 폴링 전략 | 10ms ~ 5s exponential backoff | 이벤트 기반 (폴링 없음) | 100ms ~ 10s exponential backoff |
| 이벤트 분배 | Hub (pub/sub) → HubWatcher | Hub (pub/sub) → HubWatcher | EventMultiplexer → Subscription |
| 순서 보장 | capped collection 자연 순서 | oplog 순서 | change_log.id 단조 증가 시퀀스 |
| 변경 합침 | 타이밍 기반 암묵적 | 타이밍 기반 암묵적 | GROUP BY 명시적 coalescing |
| 느린 구독자 | 채널 블로킹 | 채널 블로킹 | 10초 타임아웃 후 자동 해제 |
| 오류 복구 | CappedPositionLost → 에이전트 재시작 | resumeToken으로 재개 | watermark에서 재개 |
| HA 동기화 | 각 controller 독립 | 각 controller 독립 | cross-controller watermark 조율 |
| 정리(Pruning) | capped collection 자동 | capped collection 자동 | watermark 기반 수동 pruning |
| 모니터링 | 없음 | 없음 | Prometheus 메트릭 Prometheus 형식의 메트릭. 폴링 소요 시간, 변경 수, 활성 구독 수, watermark 업데이트 횟수 등을 수집하여 운영 가시성을 제공한다. 내장 |
핵심 포인트
- 트리거 기반 캡처: 4.0은 SQL 트리거로 모든 변경을 change_log에 자동 기록 — 애플리케이션 코드 수정 없이 새 테이블도 즉시 모니터링 가능
- 명시적 Coalescing:
GROUP BY namespace_id, changed로 같은 엔티티의 연속 변경을 하나로 합쳐 이벤트 폭풍 방지 - 네임스페이스 격리: 모델별로 독립적인 Stream + EventMultiplexer, 한 모델의 부하가 다른 모델에 영향을 주지 않음
- Watermark 기반 Pruning: HA 환경에서 가장 느린 controller 기준으로 안전하게 로그 정리
- 느린 구독자 보호: 10초 내 소비하지 못하는 구독자를 자동 해제하여 전체 시스템 보호
- 3단계 진화: 폴링(2.9) → 이벤트 스트림(3.6) → 트리거+폴링(4.0) — 각 DB 엔진의 특성에 맞게 최적화
- 관측성: 4.0에서 Prometheus 메트릭 내장으로 운영 환경에서의 모니터링 가능
관련 문서
- Juju Status —
juju status --watch가 Change Stream을 활용하여 실시간 갱신하는 방법 - Leadership Management — lease 변경 이벤트가 Change Stream을 통해 리더십 워커에 전달되는 과정
- Model Migration — migration-inactive-flag 변경이 Change Stream을 통해 전파되어 워커를 자동 중지/시작하는 메커니즘