edge-processor — 엣지 데이터 처리기
historian-edge-processor · 공장 설비 태그를 Redis 버퍼에서 꺼내 수집 → 가상태그 → 알람 → InfluxDB 저장 → 정리 5단계로 처리하는 Spring Boot 애플리케이션. 신규 개발자·운영자가 구조·흐름·설정·위험 요소를 빠르게 파악하기 위한 문서입니다.
1개요
한 줄 정의. 별도 수집기(OPC/PLC 게이트웨이 추정)가 Redis에 적재한 실시간 설비 태그를, 이 프로세서가 꺼내 가공·저장·감시하는 처리 계층이다. 이름에 "edge"가 있지만 MQTT/Kafka/시리얼/TCP 등 디바이스 직접 통신은 없다 — 유입은 전적으로 Redis를 통한 간접 방식.
주요 책임 4가지
- 데이터 파이프라인 — Redis 태그 수집 → 가상태그 생성 → 알람 평가 → InfluxDB 저장 → Redis 정리
- 알람 엔진 — 태그 조건식 DSL 평가 → 알람 발생/해제 및 외부 알림 REST 전송
- 가상 태그 — 실측 태그를 산술식으로 조합한 파생 태그 계산·저장
- CPK 계산 · 태그 동기화 — 공정능력지수 산출, 외부 DB(MSSQL/MariaDB) 태그 메타 동기화
settings.gradle: historian-edge-processor · 패키지 루트 com.lsitc.historian.edge.processor · 개발사 LS ITC(group = com.lsitc). README는 코드 포맷/훅만 다루므로 기능은 코드로 확인함.2빠른 시작 · 실행
최초 체크아웃 후 (필수)
# Spotless 커밋 훅 + 커밋 메시지 템플릿 설치
./gradlew setup
빌드 · 실행 · 테스트
./gradlew clean build # 빌드 (jar 생성)
./gradlew bootRun # 로컬 실행 → http://localhost:8082
./gradlew test jacocoTestReport # 테스트 + 커버리지(JaCoCo)
./gradlew spotlessApply # 코드 포맷 자동 적용
application.yml의 주소로 접근 가능해야 부팅된다(JPA ddl-auto: validate라 스키마 불일치 시 기동 실패). 로컬 개발은 해당 인프라 접속 또는 프로퍼티 오버라이드 필요. 별도 프로파일(application-{env}.yml)은 없다 — 환경 분리는 기능 토글 + CI 파라미터로만.3아키텍처 · 데이터 흐름
패턴. 전통적 레이어드(api / service / domain / repository). 데이터 파이프라인만 Abstract Factory + Strategy로 legacy/json_v1 버전을 application.collect.version으로 전환한다.
.cursor/rules/project-rule.mdc는 헥사고날 아키텍처 + TDD를 "필수"로 규정하나, 실제 코드는 헥사고날 패키지(adapter/port)를 따르지 않는 레이어드 구조다. 테스트(40개)에는 TDD 흔적이 반영됨.실시간 파이프라인 5단계 DataPipelineService.processPipeline()
- 각 단계가 try/catch로 격리 — 한 단계 실패가 전체를 중단시키지 않음(저장 실패만
error, 나머지warn). - 수집 결과를
Map<String,Object>로 전달 → 타입 안전성 약함(§10 개선 대상). - 폴링:
json_v1은fixedDelay=20ms,legacy는cron 매초+@Async.
버전별 컴포넌트 매핑
| 단계 | 인터페이스 | json_v1 | legacy |
|---|---|---|---|
| 수집 | Collector | JsonV1Collector (Redis JSON 파싱) | LegacyCollector (^/, 문자열 파싱) |
| 가상태그 | VirtualTagGenerator | CommonVirtualTagGenerator (공용) | |
| 알람 | AlarmCaller | CommonAlarmCaller (공용) | |
| 저장 | InfluxSaver | JsonV1InfluxSaver | LegacyInfluxSaver |
| 정리 | Cleaner | JsonV1Cleaner (키 삭제) | NoOpCleaner |
팩토리: JsonV1ComponentsFactory / LegacyComponentsFactory 가 @ConditionalOnProperty로 선택 활성.
4핵심 모듈
4.1 데이터 파이프라인 pipeline/data
DataPipelineService가 @PostConstruct에서 팩토리로 5개 컴포넌트를 주입받아 순차 실행하는 오케스트레이터.
누락 데이터 복구 MissedDataRecoveryService: 10분마다 timeThreshold(2000ms)를 초과한 오래된 Redis 키를 배치(기본 60개)로 수집·저장. legacy 구현은 미지원(UnsupportedOperationException) — json_v1 전용. 복구 경로에선 알람 미실행.
4.2 알람 엔진 alarm, common/util/alarm
TagAlarmService/MultiAlarmService— 인메모리 룰 관리 + 평가 + 로그 저장 + 이벤트 발행.@Scheduled(1s)로 룰 활성/비활성 상태 전이.TagRuleManager/MultiRuleManager—Collections.newSetFromMap(ConcurrentHashMap)기반 스레드세이프 룰 셋.- 표현식 파서 —
Tokenizer → AlarmExpressionPreprocessor → RecursiveDescentParser → Condition 트리(§8 문법 참조). - 알림 발송 —
AlarmFiredEvent→@Async("alarmEventExecutor")+@TransactionalEventListener(AFTER_COMMIT)→RestClientPOST/api/v1/alerts/notify(타임아웃 미적용·재시도 없음 — 상수 3s는 코드상 주석처리, 심층분석 ② 참조).
application.alarm.use=false가 기본값. 알람 엔진 코드는 존재하나 발화(fire)는 이 토글로 제어됨. 운영 활성화 시 true로 전환 필요.4.3 가상 태그 virtualtag
VirtualTag—tagId,expression({{TAG}}참조),savingPolicy애그리거트.VirtualTagExpressionEvaluator— Shunting-Yard 알고리즘으로+ - * /산술식 평가(중위→후위, 0 나눗셈·괄호 불일치 예외 처리).SavingPolicy/SavingInterval— 기간·간격("5m","10s","2h") 저장 정책,shouldSaveAt(LocalDateTime).VirtualTagService—@Scheduled(fixedRate=1s)로 DB 변화 감지, 병렬 스트림 생성·저장.
4.4 CPK · 동기화 service
| 서비스 | 역할 | 주기 | 기본 |
|---|---|---|---|
SbCpkCalcService | 공정능력지수(CPK) 산출 → InfluxDB write | 1분 cron | off |
KolonTagSyncService | MSSQL "Kolon" 태그 → MariaDB 동기화 (DA_TYPE→TagType) | 1분 cron | off |
TagProcessSyncService | process 미할당 태그를 InfluxDB PROC 조회로 보강 | 5분 cron | off |
SbCpkCalcService의 생산 데이터(prodList)가 하드코딩 목(mock) 상태 — PoC 흔적. MyBatis 매퍼(SebangCpkMapper.xml)도 설정만 있고 실계산은 InfluxDB Flux로 수행. CPK 관련 토글은 모두 기본 false.4.5 도메인 모델 domain
Tag—id,value(Double 또는 String 다형성),process,line,influxTags(Map).isNumeric()/getNumericValue()등 제공,id기준 equals.TagInfoMap—StampedLock기반 싱글톤 스레드세이프 레지스트리(낙관적 읽기 락).
5외부 연동 · 인프라
| 대상 | 용도 | 접근 방식 | 주소(운영 기본) |
|---|---|---|---|
| Redis | 실시간 태그 수집 버퍼 | RedisTemplate, 해시 json_v1/STATE | 10.123.202.125:6379 db 0 |
| InfluxDB 7.x | 시계열 저장/조회 | WriteApiBlocking, QueryApi(Flux) | 10.123.202.120:8086 org lsitc |
| MariaDB (primary) | 알람·가상태그 메타/로그 | JPA(Hibernate), Primary | 10.123.202.124:3306/factory_sight |
| MSSQL (kolon) | 태그 마스터 동기화 소스 | JPA, 별도 EM, 조건부(tag-sync.use) | 10.123.202.121:1433/HISTORIAN |
| 알림 서버 | 알람 발생 REST 푸시 | RestClient POST /api/v1/alerts/notify | 10.123.202.122:8080 |
InfluxDB 버킷: raw test / aggr aggr, measurement default_line. 저장 정밀도 WritePrecision.S(초).
application.yml에 DB 비밀번호와 InfluxDB 토큰이 평문으로 노출되어 있다(예: MariaDB fsuser / Lsitc#••••, MSSQL scr_admin / sqladmin••••, InfluxDB 토큰 전체). 본 매뉴얼에는 값을 마스킹함.조치: 환경변수 / Vault / K8s Secret 으로 즉시 분리하고, 노출된 자격증명은 로테이션(교체) 권장.
6설정 레퍼런스 (feature flags)
거의 모든 기능이 토글로 제어된다. 아래는 src/main/resources/application.yml 실제 기본값 기준.
| 프로퍼티 | 기본값 | 의미 |
|---|---|---|
application.collect.use | true | 수집 파이프라인 활성 |
application.collect.version | json_v1 | 파이프라인 버전(json_v1|legacy) |
application.virtual_tag.use | true | 가상 태그 생성 활성 |
application.alarm.use | false | 알람 발화 활성 |
application.alarm.notification.enabled | true | 알람 외부 알림 전송 |
application.redis.time-threshold-ms | 2000 | 수집 신선도 임계값(ms), 초과 시 누락 복구 대상 |
application.missed-data-collect.use | true (코드 기본) | 누락 데이터 배치 복구 |
application.tag-sync.use | false | MSSQL→MariaDB 태그 동기화 |
application.tag-process-sync.use | false | 태그-프로세스 매핑 동기화 |
application.cpk-calc.use | false | CPK 계산 |
application.cpk-alarm.use | false | CPK 알람 워커 |
application.init.use_csv | false | CSV 초기 데이터 적재 |
ANALYSIS.md는 cpk-calc.use를 "true만 활성"으로 적었으나, 현재 application.yml에서는 false이며 alarm.use도 false다. 본 매뉴얼은 실제 설정 파일을 근거로 한다. JPA는 ddl-auto: validate, show-sql: true, MySQL8 dialect.7스케줄 작업 (10)
모든 스케줄은 단일 TaskScheduler(ThreadPoolTaskScheduler, poolSize 10)를 공유. 알림 이벤트만 별도 alarmEventExecutor(core 5/max 10/queue 100, CallerRunsPolicy).
| # | 클래스 / 메서드 | 주기 | 유형 | 활성 조건 | 기본 |
|---|---|---|---|---|---|
| 1 | JsonV1DataPipelineScheduler.executePipeline | 20ms | fixedDelay | collect.version=json_v1 | on |
| 2 | LegacyDataPipelineScheduler.executePipeline | 매초 | cron | collect.version=legacy | off |
| 3 | MissedDataRecoveryService.processPipeline | 10분 | fixedDelay | json_v1 + missed-data-collect.use | on |
| 4 | VirtualTagService.scanAndDetectChanges | 1s | fixedRate | 무조건 | on |
| 5 | TagAlarmService 상태전이 | 1s | fixedDelay | initialized 후 | on |
| 6 | MultiAlarmService 상태전이 | 1s | fixedDelay | initialized 후 | on |
| 7 | TagProcessSyncService.syncTagProcesses | 5분 | cron | tag-process-sync.use | off |
| 8 | KolonTagSyncService.syncTags | 1분 | cron | tag-sync.use | off |
| 9 | SbCpkAlarmWorker.execute | 5s | fixedDelay | cpk-alarm.use | off |
| 10 | SbCpkCalcService.sebangCpkCalc | 1분 | cron | cpk-calc.use | off |
fixedDelay=20은 단위 없는 밀리초 — 초로 오인 소지, 의도 확인 필요. ② 1s·20ms 다수 작업이 poolSize 10 단일 스케줄러 공유 → 한 작업이 길어지면 다른 스케줄 지연. 핫 경로 분리 검토. ③ #4만 fixedRate(나머지 fixedDelay) — 처리시간이 주기 초과 시 동작 차이.8알람 표현식 DSL
common/util/alarm의 재귀하강 파서가 처리하는 조건식 문법.
expression ::= term (OR term)*
term ::= factor (AND factor)*
factor ::= '(' expression ')' | comparison
comparison ::= {{TAG_ID}} OP value # OP: == != > >= < <=
- 우선순위:
()>AND>OR - 비교 연산자:
== != > >= < <=— 등호/부등호는 숫자·문자 모두, 대소 비교(> <등)는 숫자만 - 태그 참조:
{{TAG_ID}}형식으로 실측/가상 태그 값을 치환
# 예시: 온도 태그가 80 초과이고 압력이 100 이상일 때
{{TEMP_01}} > 80 AND {{PRESS_01}} >= 100
# 예시: 상태가 정지이거나 알람 플래그가 켜졌을 때
{{STATUS}} == STOP OR {{ALARM_FLG}} == 1
가상 태그 산술식은 별도로 VirtualTagExpressionEvaluator(Shunting-Yard)가 + - * /와 괄호를 평가한다.
9운영 · 트러블슈팅
| 증상 | 가능성 높은 원인 | 확인 지점 |
|---|---|---|
| 기동 실패 (JPA validate 오류) | MariaDB 스키마 ↔ 엔티티 불일치 | ddl-auto: validate, factory_sight 스키마 |
| 부팅 시 커넥션 오류 | Redis/InfluxDB/MariaDB 접근 불가 | §5 주소, 네트워크·방화벽 |
| 데이터가 InfluxDB에 안 쌓임 | Redis에 태그 미유입 / collect.use=false | Redis 해시 json_v1, 토글 |
| 알람이 안 울림 | alarm.use=false (기본) | §6 토글, 룰 초기화 로그 |
| 알림 유실 | 알림 REST 재시도 없음, 타임아웃 3s | 알림 서버 상태, AFTER_COMMIT 로그 |
| 오래된 데이터 지연 반영 | 누락 복구는 10분 주기 배치 | MissedDataRecoveryService, time-threshold-ms |
| 스케줄 전반 지연 | 단일 스케줄러 풀(10) 경합 | §7, 장기 실행 작업 로그 |
.gitlab-ci.yml의 test 단계가 || true로 실패해도 통과한다 — 회귀 방지 효과 없음. 배포 전 테스트 결과를 수동 확인하거나 게이트 활성화 권장.10기술 부채 · 개선 과제
| # | 항목 | 영역 | 심각도 |
|---|---|---|---|
| 1 | DB 패스워드·InfluxDB 토큰 평문 하드코딩 → 환경변수/Vault 분리 + 로테이션 | 보안 | High |
| 2 | 파이프라인이 Map<String,Object> 전달, @SuppressWarnings("unchecked") 다수 → 타입드 레코드로 전환 | 유지보수 | Medium |
| 3 | 규칙(.cursor 헥사고날/TDD 필수) ↔ 실제 레이어드 구조 괴리 → 정합성 확보 | 유지보수 | Medium |
| 4 | CPK 미완성 — prodList 하드코딩 목, MyBatis 매퍼 미사용 잔재 | 기능 | Medium |
| 5 | CI 테스트 게이트 || true로 무력화 → 게이트 활성화 | 품질 | Medium |
| 6 | CompletableFuture.runAsync()가 공용 ForkJoinPool 사용 → 전용 executor 지정 | 성능 | Low |
| 7 | fixedDelay=20ms 공격적 폴링 → 빈 큐 백오프 검토 | 성능 | Low |
| 8 | MyBatis application.yml/XML 존재하나 build.gradle 의존성 없음 → 잔재 정리 또는 의존성 추가 | 빌드 | Low |
11신규 개발자 가이드
읽는 순서
HistorianEdgeProcessorApplication.java— 진입점config/— DataSource·Redis·InfluxDB·Scheduler 설정으로 인프라 그림 파악pipeline/data/service/DataPipelineService.java— 파이프라인 5단계 오케스트레이션(핵심)pipeline/data/factory/+component/impl/{json_v1,legacy}— 버전별 구현domain/Tag.java,domain/TagInfoMap.java— 도메인 모델alarm/service/*+common/util/alarm/RecursiveDescentParser— 알람 엔진·파서virtualtag/VirtualTagExpressionEvaluator— 가상태그 산술 평가src/main/resources/application.yml— 토글·연동 총람
실제 소스를 라인 단위로 정독한 두 핵심 흐름의 상세 분석입니다. 코드 인용은 파일:라인 기준이며, 코드로 확정하지 못한 부분은 (추정)으로 표기합니다.
🔬심층분석 ① 태그 수집 흐름
범위. 운영 기본값 application.collect.version=json_v1(application.yml:48) 경로를 중심으로, legacy는 대조군으로 병기한다. 데이터가 Redis 해시 문자열 → Map<String,Object> → List<Tag> → InfluxDB Point로 변형되는 전 과정을 추적한다.
1) Redis 수집 소스 — 두 개의 해시
| 버전 | Redis 키(Hash) | 필드 | 값 |
|---|---|---|---|
| json_v1 | Hash json_v1 | epoch-millis 타임스탬프 문자열 | JSON 문자열 |
| legacy | Hash STATE | 상수 "STATE" | ^/, 구분 문자열 |
직렬화는 전부 StringRedisSerializer(RedisConfig.java:16-18) — Redis raw는 순수 문자열.
신선도(±2초) 판정 · 최신 키 선택 getLatestTimestampKey() (RedisService.java:28-54)
long currentTime = Instant.now().getEpochSecond() * 1000; // 40
return keys.stream()
.filter(key -> Math.abs(Long.parseLong(key) - currentTime) <= timeThreshold) // 47
.max((k1,k2) -> Long.compare(Long.parseLong(k1), Long.parseLong(k2))) // 52
.orElse(null);
timeThreshold=application.redis.time-threshold-ms:2000. 현재 시각 ±2초 이내 키만 통과, 그중 가장 최근(max) 1건만 소비.- 누락 분류
getMissedTimestampKey()(:56-83):(currentTime - timestamp) > 2000— 2초 넘게 과거인 키 전부를 오래된 순 List로. → 복구 대상.
getEpochSecond()*1000으로 밀리초를 버리고 ×1000 → 키(epoch-millis)와 비교 시 경계에서 최대 999ms 비대칭 오차 (추정: 의도치 않은 미세 버그). ② 판정 사각지대: 시계가 크게 앞서 timestamp가 미래이면서 2초 밖인 키는 latest(abs≤2000)·missed(차이>2000 양수) 어느 필터에도 안 잡혀 방치될 수 있음 (추정).2) Collector 파싱 — 데이터 변형
JsonV1Collector (JsonV1Collector.java:135-193)
입력 JSON 스키마와 매핑:
{ "data":[ { "field":{ "<tagId>": <value> }, "tag":{ "PROC":"P1", "<k>":"<v>" } } ] }
field 엔트리 → Tag.id = key, Tag.value = 숫자면 Double / 문자면 String
tag.PROC → Tag.process (PROC 키는 influxTags에서 제외)
tag의 나머지 → Tag.influxTags(Map)
- 타입 판별
parseValueFromJson(:202-215):isNumber→Double,isTextual→String,isNull→스킵, 미지원 타입은warn후 스킵. - 성공/실패 모두
dataKey를 반환 Map에 포함(정리 단계에서 사용).line필드는 수집 경로에서 세팅되지 않음(확인된 사실).
LegacyCollector (LegacyCollector.java:60-68)
data.split("\^") // 행 구분 '^'
→ row.split(",") // 열 구분 ','
→ Tag.builder().id(t[0]).value(Double.parseDouble(t[1])).build();
Double.parseDouble → 문자/빈값이면 NumberFormatException, 배열 길이 부족 시 ArrayIndexOutOfBounds → 상위 catch(Exception)가 배치 전체를 실패 처리(부분 격리 없음). process/line/influxTags 미설정(메타 보강은 LegacyInfluxSaver가 TagInfoMap으로 수행 — 클래스 주석과 위치 불일치).Tag.equals/hashCode는 id만 기준(Tag.java:75-86) → 동일 id 중복 시 Set/merge 충돌 여지.
3) 파이프라인 5단계 — 격리와 로깅 레벨
DataPipelineService는 @PostConstruct에서 PipelineComponentsFactory로 5개 컴포넌트를 생성해 보관(:44-53). processPipeline()(:59-119) 단계별 격리:
| 단계 | 호출 | 실패 시 |
|---|---|---|
| 1. 수집 | collector.collect() | success=false/빈 tags → return(중단), warn |
| —. 가상태그 | virtualTagGenerator.generate() | try/catch warn, 계속 |
| —. merge | mergeAlarmTags(tags, virtualTags) | 단순 concat(중복제거 없음) |
| 2. 알람 | alarmCaller.fireAlarms(alarmTags, ts) | try/catch warn |
| 3. 저장 | influxSaver.save(tags, ts) | try/catch error (유일) |
| 4. 정리 | cleaner.cleanup(createCleanCommand(...)) | try/catch warn |
- 알람은
tags + virtualTags(alarmTags), 저장은 실물tags만 대상. 가상태그 저장은generate()내부에서 별도 수행(추정).
error로 실패해도 catch로 삼켜지고 4단계 정리가 그대로 실행되어 Redis 원본(dataKey)이 삭제된다. 이미 삭제됐으므로 누락 복구 대상에도 안 들어가 데이터 유실 가능 (추정: 위험 지점).4) 스케줄러 · 누락 복구
| 스케줄러 | 주기 | @Async | 재진입 |
|---|---|---|---|
JsonV1DataPipelineScheduler | fixedDelay=20ms | 없음 | 비재진입(직렬) |
LegacyDataPipelineScheduler | cron 매초 | 있음 | 재진입 가능 |
누락 복구 MissedDataRecoveryService(@Scheduled(fixedDelay=10분), json_v1 전용): getMissedTimestampKey()로 오래된 키를 batchSize 60 단위 Lazy Stream으로 수집 → virtualTag→influx→clean 순 처리. 알람은 미발화(AlarmCaller 미주입, :52). legacy는 UnsupportedOperationException(빈 자체가 생성 안 되어 실경로 도달 불가).
5) 수집 흐름 시퀀스 (json_v1)
HKEYS는 해시 전체 필드 스캔이라 적체 시 O(N) 비용이며 20ms 폴링과 결합 시 부하 요인. 병렬 스트림은 사용하지 않음(전부 순차).matchIfMissing)은 legacy. collect.version 프로퍼티가 누락되면 legacy로 폴백된다.🚨심층분석 ② 알람 프로세스
알람은 단일 태그와 다중 태그 두 계열이 거의 대칭 구조로 존재한다. 현재 application.alarm.use=false(yml)라 발화 경로 자체는 기본 비활성이나, 활성화 시 아래 생애주기로 동작한다.
| 구분 | 단일 태그 | 다중 태그 |
|---|---|---|
| 룰 도메인 | TagAlarmRule | MultiAlarmRule |
| 인메모리 저장소 | TagRuleManager | MultiRuleManager |
| 평가 서비스 | TagAlarmService | MultiAlarmService |
| 조건 소스 | upper/lower/exact 필드 → TagAlarmConditionFactory | DB expression 문자열 → AlarmExpressionParser |
1) 인메모리 룰 셋 — tagId가 곧 식별자
Set<TagAlarmRule> tagRules = Collections.newSetFromMap(new ConcurrentHashMap<>());
Set<TagAlarmRule> firedTagRules = Collections.newSetFromMap(new ConcurrentHashMap<>()); // TagRuleManager:20-26
tagRules= 평가 대상 활성 룰 전체.firedTagRules= 이미 발화되어 억제 중인 룰의clone()사본.equals/hashCode가tagId만 사용(TagAlarmRule.java:256-266) → tagId당 룰 1개. (Multi는multiAlarmNo기준)- 초기화
@PostConstruct init(): 재시작 잔여deactivating정리 →activated룰 로드 →initialized=true. 이 플래그가 상태전이 스케줄의 조기 실행을 가드.
2) 표현식 DSL — 파싱 파이프라인
AlarmExpressionParser.parse() → Preprocessor → Tokenizer → RecursiveDescentParser → Condition 트리.
expression ::= term (OR term)* // OR 최저 우선순위, 좌결합
term ::= factor (AND factor)* // AND > OR
factor ::= singleCondition | '(' expression ')' // 괄호 최고
singleCondition ::= TAG_ID OP (NUMBER | STRING) // OP: == != > >= < <=
- 전처리(
Preprocessor): 단독=→==,'ON'→1,'OFF'→0(작은따옴표만). - 토큰화:
{{...}}→TAG_ID,"..."→STRING,AND/OR은 대소문자 무시+경계 필요, 음수·소수 지원, 단독=는 예외(전처리 전제). {{TAG}}는 값 치환이 아니다 — 토큰에 tagId 문자열만 담고, 평가 시점에tagValues.get(tagId)로 런타임 lookup(SingleCondition.java:44-49).
==/!=는 숫자·문자 모두 지원하되 타입이 다르면 무조건 false. > >= < <=는 숫자(DoubleValue)만 — 문자열이 섞이면 UnsupportedOperationException. 이 예외가 상위 평가에서 legacy 폴백을 유발할 수 있음(아래 4).{{TEMP}} > 80 AND {{PRESS}} >= 100토큰: TAG_ID(TEMP) GT(>) NUM(80) AND TAG_ID(PRESS) GE(>=) NUM(100) EOF
트리: CompoundCondition(AND)
├ SingleCondition(TEMP, >, 80.0)
└ SingleCondition(PRESS, >=, 100.0)
평가: {TEMP:85, PRESS:120} → 85>80(T) AND 120>=100(T) → true, winning=(PRESS>=100)
{TEMP:85, PRESS:90} → 첫 조건 T, 둘째 90>=100(F) → 단락평가로 즉시 false3) 두 종류의 스케줄 — 혼동 금지
| 구분 | (A) 룰 라이프사이클 | (B) 값 평가·fire/clear |
|---|---|---|
| 주기 | @Scheduled(fixedDelay=1000) (1초) | 파이프라인 20ms → fireAlarms |
| 하는 일 | activating→룰 추가 / deactivating→룰 제거 (DB 폴링) | 실제 조건 평가로 발생/해제 |
| 스레드 | taskScheduler(풀 10) | 공용 ForkJoinPool |
(A)는 "룰을 평가 대상에 넣고 빼는" 관리 작업일 뿐, fire/clear가 아니다.
4) fire / clear 전이 (값 평가)
핵심 evaluateAndFilterRulesWithAlarmValue(TagAlarmService.java:263-317) — rules.parallelStream():
- 미발생→발생
addFiredRuleIfNew(:375-383):firedTagRules.contains면 억제(중복 차단), 아니면clone()을 셋에 추가하고 신규 발화만 하류로 →occurred로그 저장. - 발생→해제
resolveFiredTagRule(:385-395): 열린 로그를resolved로 전환 후 셋에서 제거. - 디지털 이벤트성 예외:
onToOff/offToOn등은 자동 해제하지 않음(엣지 이벤트). - 억제/재활성: 매 실행 앞에서
removeInvalidFiredRules(now)가canReactivate(suppression 경과, 기본FOREVER) 룰을firedTagRules에서 제거 → 재발화 허용.
발화 시 createAndSaveAlarmLogs가 occurred 로그 saveAll 후 publishEvent(AlarmFiredEvent)(:539). 저장 실패는 catch만 하고 이벤트 미발행(조용히 유실).
5) 비동기 발화 · 알림 전송
// CommonAlarmCaller.java:37-38 (useAlarm=application.alarm.use, yml=false)
CompletableFuture.runAsync(() -> tagAlarmService.executeTagAlarm(tags, ts));
CompletableFuture.runAsync(() -> multiAlarmService.executeMultiAlarm(tags, ts));
스레드풀: runAsync(Runnable)는 Executor 미지정 → 공용 ForkJoinPool.commonPool(). 내부 parallelStream도 commonPool → 평가 태스크와 스트림이 같은 풀 공유. fire-and-forget이라 예외는 삼켜짐.
알림 AlarmNotificationEventListener:
| 실행 | @Async("alarmEventExecutor") + @TransactionalEventListener(AFTER_COMMIT, fallbackExecution=true) |
| 대상 | RestClient POST {alarm.notification.url}/api/v1/alerts/notify (yml 10.123.202.122:8080) |
| 페이로드 | {"alarms":[{alarmType, alarmLogNo}, ...]} |
| 재시도 | 없음 (catch 후 log.error만) |
CONNECTION/READ_TIMEOUT=3000 상수와 createRequestFactory()가 있으나, 생성자에서 그 팩토리 사용 라인이 주석 처리되고 new SimpleClientHttpRequestFactory()를 그대로 쓴다 → 타임아웃 미설정(사실상 무한 대기). 알림 서버가 지연되면 alarmEventExecutor(max 10/queue 100/CallerRunsPolicy) 스레드가 묶이고, 큐 초과 시 커밋 직후 스레드가 대신 실행되어 지연이 전파될 수 있다. (요약 §4·§7의 "타임아웃 3s"는 상수 존재 기준 표기 — 실제 적용은 이 정정을 따른다.)6) REST API · 생애주기 시퀀스
| 컨트롤러 | 엔드포인트 |
|---|---|
TagAlarmRuleController/api/v1/alarm/tag-alarm | POST /reactivate(firedTagRules 제거), GET /status — register/unregister 없음(DB status 폴링으로 반영) |
MultiAlarmRuleController/api/v1/alarm/multi-alarm | POST /(register), DELETE /?tid=, POST /reactivate, GET /status |
7) 동시성 주의점 (요약)
| # | 항목 |
|---|---|
| 1 | commonPool 미격리·무역압: fireAlarms의 runAsync×2와 내부 parallelStream이 전부 공용 ForkJoinPool. 20ms 폴링이 fire-and-forget 제출 → 평가가 20ms 초과 시 태스크 누적. |
| 2 | 셋 복합연산 비원자성: 상태전이(1s)의 addAll/removeAll과 평가의 contains→add(addFiredRuleIfNew)가 같은 셋을 동시 수정. ConcurrentHashMap이라 개별 연산은 안전하나 복합 연산은 비원자적 → 짧은 창에서 중복 fire/해제 교차 여지 (추정). |
| 3 | 룰 객체 가변 상태: currentValue/lastValue/firedTime이 평가 중 뮤테이트. 20ms 겹침 실행 시 상태 시퀀스 엉킴 가능. firedTagRules는 clone()이라 로그 스냅샷은 보존 (추정). |
| 4 | 폴백 재평가 부작용: AlarmValue 경로가 예외 시 legacy 전체 재실행 → 예외 이전 firedTagRules 부분 변경과 겹칠 수 있음(멱등성 미보장, 추정). |