Developer Manual · 개발 매뉴얼

edge-processor — 엣지 데이터 처리기

historian-edge-processor · 공장 설비 태그를 Redis 버퍼에서 꺼내 수집 → 가상태그 → 알람 → InfluxDB 저장 → 정리 5단계로 처리하는 Spring Boot 애플리케이션. 신규 개발자·운영자가 구조·흐름·설정·위험 요소를 빠르게 파악하기 위한 문서입니다.

☕ Java 21 🍃 Spring Boot 3.4.3 🐘 Gradle 📦 main 99 / test 40 🗓 작성일 2026-06-29 🤖 작성 Tars

1개요

한 줄 정의. 별도 수집기(OPC/PLC 게이트웨이 추정)가 Redis에 적재한 실시간 설비 태그를, 이 프로세서가 꺼내 가공·저장·감시하는 처리 계층이다. 이름에 "edge"가 있지만 MQTT/Kafka/시리얼/TCP 등 디바이스 직접 통신은 없다 — 유입은 전적으로 Redis를 통한 간접 방식.

주요 책임 4가지

  1. 데이터 파이프라인 — Redis 태그 수집 → 가상태그 생성 → 알람 평가 → InfluxDB 저장 → Redis 정리
  2. 알람 엔진 — 태그 조건식 DSL 평가 → 알람 발생/해제 및 외부 알림 REST 전송
  3. 가상 태그 — 실측 태그를 산술식으로 조합한 파생 태그 계산·저장
  4. CPK 계산 · 태그 동기화 — 공정능력지수 산출, 외부 DB(MSSQL/MariaDB) 태그 메타 동기화
ℹ️ 근거
settings.gradle: historian-edge-processor · 패키지 루트 com.lsitc.historian.edge.processor · 개발사 LS ITC(group = com.lsitc). README는 코드 포맷/훅만 다루므로 기능은 코드로 확인함.

2빠른 시작 · 실행

최초 체크아웃 후 (필수)

# Spotless 커밋 훅 + 커밋 메시지 템플릿 설치
./gradlew setup

빌드 · 실행 · 테스트

./gradlew clean build          # 빌드 (jar 생성)
./gradlew bootRun              # 로컬 실행 → http://localhost:8082
./gradlew test jacocoTestReport # 테스트 + 커버리지(JaCoCo)
./gradlew spotlessApply        # 코드 포맷 자동 적용
서버 포트
8082
기본 수집 버전
json_v1
버전 프로퍼티
revision/changelist
JDK Toolchain
Java 21
⚠️ 실행 전제
Redis·InfluxDB·MariaDB가 application.yml의 주소로 접근 가능해야 부팅된다(JPA ddl-auto: validate라 스키마 불일치 시 기동 실패). 로컬 개발은 해당 인프라 접속 또는 프로퍼티 오버라이드 필요. 별도 프로파일(application-{env}.yml)은 없다 — 환경 분리는 기능 토글 + CI 파라미터로만.

3아키텍처 · 데이터 흐름

패턴. 전통적 레이어드(api / service / domain / repository). 데이터 파이프라인만 Abstract Factory + Strategylegacy/json_v1 버전을 application.collect.version으로 전환한다.

⚠️ 규칙 ↔ 구현 괴리
.cursor/rules/project-rule.mdc헥사고날 아키텍처 + TDD를 "필수"로 규정하나, 실제 코드는 헥사고날 패키지(adapter/port)를 따르지 않는 레이어드 구조다. 테스트(40개)에는 TDD 흔적이 반영됨.

실시간 파이프라인 5단계 DataPipelineService.processPipeline()

[Redis json_v1 / STATE 해시] │ (1) Collector.collect() → List<Tag> + timestamp + dataKey ▼ realTags │ (2) VirtualTagGenerator.generate() → 파생(가상) 태그 생성 ▼ realTags + virtualTags 병합 (mergeAlarmTags) │ (3) AlarmCaller.fireAlarms() → 비동기 알람 평가 │ (4) InfluxSaver.save(realTags) → InfluxDB Point 일괄 write │ (5) Cleaner.cleanup(dataKey) → Redis 처리완료 키 삭제 ▼ [InfluxDB measurement=default_line] + [알람 로그 / 외부 알림]
  • 각 단계가 try/catch로 격리 — 한 단계 실패가 전체를 중단시키지 않음(저장 실패만 error, 나머지 warn).
  • 수집 결과를 Map<String,Object>로 전달 → 타입 안전성 약함(§10 개선 대상).
  • 폴링: json_v1fixedDelay=20ms, legacycron 매초 + @Async.

버전별 컴포넌트 매핑

단계인터페이스json_v1legacy
수집CollectorJsonV1Collector (Redis JSON 파싱)LegacyCollector (^/, 문자열 파싱)
가상태그VirtualTagGeneratorCommonVirtualTagGenerator (공용)
알람AlarmCallerCommonAlarmCaller (공용)
저장InfluxSaverJsonV1InfluxSaverLegacyInfluxSaver
정리CleanerJsonV1Cleaner (키 삭제)NoOpCleaner

팩토리: JsonV1ComponentsFactory / LegacyComponentsFactory@ConditionalOnProperty로 선택 활성.

4핵심 모듈

4.1 데이터 파이프라인 pipeline/data

DataPipelineService@PostConstruct에서 팩토리로 5개 컴포넌트를 주입받아 순차 실행하는 오케스트레이터.

누락 데이터 복구 MissedDataRecoveryService: 10분마다 timeThreshold(2000ms)를 초과한 오래된 Redis 키를 배치(기본 60개)로 수집·저장. legacy 구현은 미지원(UnsupportedOperationException) — json_v1 전용. 복구 경로에선 알람 미실행.

4.2 알람 엔진 alarm, common/util/alarm

  • TagAlarmService / MultiAlarmService — 인메모리 룰 관리 + 평가 + 로그 저장 + 이벤트 발행. @Scheduled(1s)로 룰 활성/비활성 상태 전이.
  • TagRuleManager / MultiRuleManagerCollections.newSetFromMap(ConcurrentHashMap) 기반 스레드세이프 룰 셋.
  • 표현식 파서Tokenizer → AlarmExpressionPreprocessor → RecursiveDescentParser → Condition 트리 (§8 문법 참조).
  • 알림 발송AlarmFiredEvent@Async("alarmEventExecutor") + @TransactionalEventListener(AFTER_COMMIT)RestClient POST /api/v1/alerts/notify (타임아웃 미적용·재시도 없음 — 상수 3s는 코드상 주석처리, 심층분석 ② 참조).
⚠️ 기본 비활성
application.alarm.use=false가 기본값. 알람 엔진 코드는 존재하나 발화(fire)는 이 토글로 제어됨. 운영 활성화 시 true로 전환 필요.

4.3 가상 태그 virtualtag

  • VirtualTagtagId, expression({{TAG}} 참조), savingPolicy 애그리거트.
  • VirtualTagExpressionEvaluatorShunting-Yard 알고리즘으로 + - * / 산술식 평가(중위→후위, 0 나눗셈·괄호 불일치 예외 처리).
  • SavingPolicy / SavingInterval — 기간·간격("5m","10s","2h") 저장 정책, shouldSaveAt(LocalDateTime).
  • VirtualTagService@Scheduled(fixedRate=1s)로 DB 변화 감지, 병렬 스트림 생성·저장.

4.4 CPK · 동기화 service

서비스역할주기기본
SbCpkCalcService공정능력지수(CPK) 산출 → InfluxDB write1분 cronoff
KolonTagSyncServiceMSSQL "Kolon" 태그 → MariaDB 동기화 (DA_TYPETagType)1분 cronoff
TagProcessSyncServiceprocess 미할당 태그를 InfluxDB PROC 조회로 보강5분 cronoff
⚠️ CPK 미완성
SbCpkCalcService의 생산 데이터(prodList)가 하드코딩 목(mock) 상태 — PoC 흔적. MyBatis 매퍼(SebangCpkMapper.xml)도 설정만 있고 실계산은 InfluxDB Flux로 수행. CPK 관련 토글은 모두 기본 false.

4.5 도메인 모델 domain

  • Tagid, value(Double 또는 String 다형성), process, line, influxTags(Map). isNumeric()/getNumericValue() 등 제공, id 기준 equals.
  • TagInfoMapStampedLock 기반 싱글톤 스레드세이프 레지스트리(낙관적 읽기 락).

5외부 연동 · 인프라

대상용도접근 방식주소(운영 기본)
Redis실시간 태그 수집 버퍼RedisTemplate, 해시 json_v1/STATE10.123.202.125:6379 db 0
InfluxDB 7.x시계열 저장/조회WriteApiBlocking, QueryApi(Flux)10.123.202.120:8086 org lsitc
MariaDB (primary)알람·가상태그 메타/로그JPA(Hibernate), Primary10.123.202.124:3306/factory_sight
MSSQL (kolon)태그 마스터 동기화 소스JPA, 별도 EM, 조건부(tag-sync.use)10.123.202.121:1433/HISTORIAN
알림 서버알람 발생 REST 푸시RestClient POST /api/v1/alerts/notify10.123.202.122:8080

InfluxDB 버킷: raw test / aggr aggr, measurement default_line. 저장 정밀도 WritePrecision.S(초).

🔴 보안 High — 시크릿 평문 하드코딩
application.ymlDB 비밀번호와 InfluxDB 토큰이 평문으로 노출되어 있다(예: MariaDB fsuser / Lsitc#••••, MSSQL scr_admin / sqladmin••••, InfluxDB 토큰 전체). 본 매뉴얼에는 값을 마스킹함.
조치: 환경변수 / Vault / K8s Secret 으로 즉시 분리하고, 노출된 자격증명은 로테이션(교체) 권장.

6설정 레퍼런스 (feature flags)

거의 모든 기능이 토글로 제어된다. 아래는 src/main/resources/application.yml 실제 기본값 기준.

프로퍼티기본값의미
application.collect.usetrue수집 파이프라인 활성
application.collect.versionjson_v1파이프라인 버전(json_v1|legacy)
application.virtual_tag.usetrue가상 태그 생성 활성
application.alarm.usefalse알람 발화 활성
application.alarm.notification.enabledtrue알람 외부 알림 전송
application.redis.time-threshold-ms2000수집 신선도 임계값(ms), 초과 시 누락 복구 대상
application.missed-data-collect.usetrue (코드 기본)누락 데이터 배치 복구
application.tag-sync.usefalseMSSQL→MariaDB 태그 동기화
application.tag-process-sync.usefalse태그-프로세스 매핑 동기화
application.cpk-calc.usefalseCPK 계산
application.cpk-alarm.usefalseCPK 알람 워커
application.init.use_csvfalseCSV 초기 데이터 적재
ℹ️ ANALYSIS.md와의 차이
기존 ANALYSIS.mdcpk-calc.use를 "true만 활성"으로 적었으나, 현재 application.yml에서는 false이며 alarm.usefalse다. 본 매뉴얼은 실제 설정 파일을 근거로 한다. JPA는 ddl-auto: validate, show-sql: true, MySQL8 dialect.

7스케줄 작업 (10)

모든 스케줄은 단일 TaskScheduler(ThreadPoolTaskScheduler, poolSize 10)를 공유. 알림 이벤트만 별도 alarmEventExecutor(core 5/max 10/queue 100, CallerRunsPolicy).

#클래스 / 메서드주기유형활성 조건기본
1JsonV1DataPipelineScheduler.executePipeline20msfixedDelaycollect.version=json_v1on
2LegacyDataPipelineScheduler.executePipeline매초croncollect.version=legacyoff
3MissedDataRecoveryService.processPipeline10분fixedDelayjson_v1 + missed-data-collect.useon
4VirtualTagService.scanAndDetectChanges1sfixedRate무조건on
5TagAlarmService 상태전이1sfixedDelayinitializedon
6MultiAlarmService 상태전이1sfixedDelayinitializedon
7TagProcessSyncService.syncTagProcesses5분crontag-process-sync.useoff
8KolonTagSyncService.syncTags1분crontag-sync.useoff
9SbCpkAlarmWorker.execute5sfixedDelaycpk-alarm.useoff
10SbCpkCalcService.sebangCpkCalc1분croncpk-calc.useoff
⚠️ 운영 주의
fixedDelay=20은 단위 없는 밀리초 — 초로 오인 소지, 의도 확인 필요. ② 1s·20ms 다수 작업이 poolSize 10 단일 스케줄러 공유 → 한 작업이 길어지면 다른 스케줄 지연. 핫 경로 분리 검토. ③ #4만 fixedRate(나머지 fixedDelay) — 처리시간이 주기 초과 시 동작 차이.

8알람 표현식 DSL

common/util/alarm의 재귀하강 파서가 처리하는 조건식 문법.

expression ::= term (OR term)*
term       ::= factor (AND factor)*
factor     ::= '(' expression ')' | comparison
comparison ::= {{TAG_ID}} OP value        # OP: == != > >= < <=
  • 우선순위: () > AND > OR
  • 비교 연산자: == != > >= < <= — 등호/부등호는 숫자·문자 모두, 대소 비교(> < 등)는 숫자만
  • 태그 참조: {{TAG_ID}} 형식으로 실측/가상 태그 값을 치환
# 예시: 온도 태그가 80 초과이고 압력이 100 이상일 때
{{TEMP_01}} > 80 AND {{PRESS_01}} >= 100

# 예시: 상태가 정지이거나 알람 플래그가 켜졌을 때
{{STATUS}} == STOP OR {{ALARM_FLG}} == 1

가상 태그 산술식은 별도로 VirtualTagExpressionEvaluator(Shunting-Yard)가 + - * /와 괄호를 평가한다.

9운영 · 트러블슈팅

증상가능성 높은 원인확인 지점
기동 실패 (JPA validate 오류)MariaDB 스키마 ↔ 엔티티 불일치ddl-auto: validate, factory_sight 스키마
부팅 시 커넥션 오류Redis/InfluxDB/MariaDB 접근 불가§5 주소, 네트워크·방화벽
데이터가 InfluxDB에 안 쌓임Redis에 태그 미유입 / collect.use=falseRedis 해시 json_v1, 토글
알람이 안 울림alarm.use=false (기본)§6 토글, 룰 초기화 로그
알림 유실알림 REST 재시도 없음, 타임아웃 3s알림 서버 상태, AFTER_COMMIT 로그
오래된 데이터 지연 반영누락 복구는 10분 주기 배치MissedDataRecoveryService, time-threshold-ms
스케줄 전반 지연단일 스케줄러 풀(10) 경합§7, 장기 실행 작업 로그
ℹ️ CI 게이트
.gitlab-ci.yml의 test 단계가 || true실패해도 통과한다 — 회귀 방지 효과 없음. 배포 전 테스트 결과를 수동 확인하거나 게이트 활성화 권장.

10기술 부채 · 개선 과제

#항목영역심각도
1DB 패스워드·InfluxDB 토큰 평문 하드코딩 → 환경변수/Vault 분리 + 로테이션보안High
2파이프라인이 Map<String,Object> 전달, @SuppressWarnings("unchecked") 다수 → 타입드 레코드로 전환유지보수Medium
3규칙(.cursor 헥사고날/TDD 필수) ↔ 실제 레이어드 구조 괴리 → 정합성 확보유지보수Medium
4CPK 미완성 — prodList 하드코딩 목, MyBatis 매퍼 미사용 잔재기능Medium
5CI 테스트 게이트 || true로 무력화 → 게이트 활성화품질Medium
6CompletableFuture.runAsync()가 공용 ForkJoinPool 사용 → 전용 executor 지정성능Low
7fixedDelay=20ms 공격적 폴링 → 빈 큐 백오프 검토성능Low
8MyBatis application.yml/XML 존재하나 build.gradle 의존성 없음 → 잔재 정리 또는 의존성 추가빌드Low

11신규 개발자 가이드

읽는 순서

  1. HistorianEdgeProcessorApplication.java — 진입점
  2. config/ — DataSource·Redis·InfluxDB·Scheduler 설정으로 인프라 그림 파악
  3. pipeline/data/service/DataPipelineService.java파이프라인 5단계 오케스트레이션(핵심)
  4. pipeline/data/factory/ + component/impl/{json_v1,legacy} — 버전별 구현
  5. domain/Tag.java, domain/TagInfoMap.java — 도메인 모델
  6. alarm/service/* + common/util/alarm/RecursiveDescentParser — 알람 엔진·파서
  7. virtualtag/VirtualTagExpressionEvaluator — 가상태그 산술 평가
  8. src/main/resources/application.yml — 토글·연동 총람
✅ 강점
파이프라인의 Abstract Factory + Strategy + 인터페이스 분리로 버전 전환이 깔끔하고, 컴포넌트별 단위 테스트(40개)가 충실. 알람 파서(재귀하강)와 가상태그(Shunting-Yard)는 정석적이며 테스트로 검증됨. 단계별 try/catch 격리·기능 토글·누락 복구 등 운영 견고성 고려가 보인다.
Deep Dive · 심층분석

실제 소스를 라인 단위로 정독한 두 핵심 흐름의 상세 분석입니다. 코드 인용은 파일:라인 기준이며, 코드로 확정하지 못한 부분은 (추정)으로 표기합니다.

🔬심층분석 ① 태그 수집 흐름

범위. 운영 기본값 application.collect.version=json_v1(application.yml:48) 경로를 중심으로, legacy는 대조군으로 병기한다. 데이터가 Redis 해시 문자열 → Map<String,Object>List<Tag> → InfluxDB Point로 변형되는 전 과정을 추적한다.

1) Redis 수집 소스 — 두 개의 해시

버전Redis 키(Hash)필드
json_v1Hash json_v1epoch-millis 타임스탬프 문자열JSON 문자열
legacyHash STATE상수 "STATE"^/, 구분 문자열

직렬화는 전부 StringRedisSerializer(RedisConfig.java:16-18) — Redis raw는 순수 문자열.

신선도(±2초) 판정 · 최신 키 선택 getLatestTimestampKey() (RedisService.java:28-54)

long currentTime = Instant.now().getEpochSecond() * 1000;         // 40
return keys.stream()
    .filter(key -> Math.abs(Long.parseLong(key) - currentTime) <= timeThreshold) // 47
    .max((k1,k2) -> Long.compare(Long.parseLong(k1), Long.parseLong(k2)))        // 52
    .orElse(null);
  • timeThreshold = application.redis.time-threshold-ms:2000. 현재 시각 ±2초 이내 키만 통과, 그중 가장 최근(max) 1건만 소비.
  • 누락 분류 getMissedTimestampKey()(:56-83): (currentTime - timestamp) > 2000 — 2초 넘게 과거인 키 전부를 오래된 순 List로. → 복구 대상.
⚠️ 확인된 미세 이슈 2가지
기준시각 초 절삭: getEpochSecond()*1000으로 밀리초를 버리고 ×1000 → 키(epoch-millis)와 비교 시 경계에서 최대 999ms 비대칭 오차 (추정: 의도치 않은 미세 버그). ② 판정 사각지대: 시계가 크게 앞서 timestamp가 미래이면서 2초 밖인 키는 latest(abs≤2000)·missed(차이>2000 양수) 어느 필터에도 안 잡혀 방치될 수 있음 (추정).

2) Collector 파싱 — 데이터 변형

JsonV1Collector (JsonV1Collector.java:135-193)

입력 JSON 스키마와 매핑:

{ "data":[ { "field":{ "<tagId>": <value> }, "tag":{ "PROC":"P1", "<k>":"<v>" } } ] }
   field 엔트리 → Tag.id = key, Tag.value = 숫자면 Double / 문자면 String
   tag.PROC    → Tag.process          (PROC 키는 influxTags에서 제외)
   tag의 나머지 → Tag.influxTags(Map)
  • 타입 판별 parseValueFromJson(:202-215): isNumber→Double, isTextual→String, isNull→스킵, 미지원 타입은 warn 후 스킵.
  • 성공/실패 모두 dataKey를 반환 Map에 포함(정리 단계에서 사용). line 필드는 수집 경로에서 세팅되지 않음(확인된 사실).

LegacyCollector (LegacyCollector.java:60-68)

data.split("\^")          // 행 구분 '^'
    → row.split(",")       // 열 구분 ','
    → Tag.builder().id(t[0]).value(Double.parseDouble(t[1])).build();
⚠️ legacy 파싱 취약
값을 무조건 Double.parseDouble → 문자/빈값이면 NumberFormatException, 배열 길이 부족 시 ArrayIndexOutOfBounds → 상위 catch(Exception)배치 전체를 실패 처리(부분 격리 없음). process/line/influxTags 미설정(메타 보강은 LegacyInfluxSaverTagInfoMap으로 수행 — 클래스 주석과 위치 불일치).

Tag.equals/hashCodeid 기준(Tag.java:75-86) → 동일 id 중복 시 Set/merge 충돌 여지.

3) 파이프라인 5단계 — 격리와 로깅 레벨

DataPipelineService@PostConstruct에서 PipelineComponentsFactory로 5개 컴포넌트를 생성해 보관(:44-53). processPipeline()(:59-119) 단계별 격리:

단계호출실패 시
1. 수집collector.collect()success=false/빈 tags → return(중단), warn
—. 가상태그virtualTagGenerator.generate()try/catch warn, 계속
—. mergemergeAlarmTags(tags, virtualTags)단순 concat(중복제거 없음)
2. 알람alarmCaller.fireAlarms(alarmTags, ts)try/catch warn
3. 저장influxSaver.save(tags, ts)try/catch error (유일)
4. 정리cleaner.cleanup(createCleanCommand(...))try/catch warn
  • 알람은 tags + virtualTags(alarmTags), 저장은 실물 tags 대상. 가상태그 저장은 generate() 내부에서 별도 수행(추정).
🔴 저장 실패 후 정리로 인한 유실
3단계 저장이 error로 실패해도 catch로 삼켜지고 4단계 정리가 그대로 실행되어 Redis 원본(dataKey)이 삭제된다. 이미 삭제됐으므로 누락 복구 대상에도 안 들어가 데이터 유실 가능 (추정: 위험 지점).

4) 스케줄러 · 누락 복구

스케줄러주기@Async재진입
JsonV1DataPipelineSchedulerfixedDelay=20ms없음비재진입(직렬)
LegacyDataPipelineSchedulercron 매초있음재진입 가능

누락 복구 MissedDataRecoveryService(@Scheduled(fixedDelay=10분), json_v1 전용): getMissedTimestampKey()로 오래된 키를 batchSize 60 단위 Lazy Stream으로 수집 → virtualTag→influx→clean 순 처리. 알람은 미발화(AlarmCaller 미주입, :52). legacy는 UnsupportedOperationException(빈 자체가 생성 안 되어 실경로 도달 불가).

5) 수집 흐름 시퀀스 (json_v1)

[JsonV1DataPipelineScheduler @Scheduled(fixedDelay=20ms)] 단일 스레드·비재진입 ▼ [DataPipelineService.processPipeline()] :59 1. collector.collect() JsonV1Collector.java:29 ├ getLatestTimestampKey() HLEN/HKEYS json_v1, |ts-now|≤2s, max RedisService:28 │ raw 필드명: "1720080000123" ├ getJsonTagData(key) HGET json_v1 <key> RedisService:85 │ raw: '{"data":[{"field":{"T1":12.3},"tag":{"PROC":"P1","LINE":"L1"}}]}' └ parseJsonData() → List<Tag>[ Tag{id=T1,value=12.3(Double), process="P1", influxTags={LINE=L1}} ] 반환 Map{tags, timestamp=Instant(key), success=true, dataKey=key} (실패 시 return → 파이프라인 중단) ─ virtualTagGenerator.generate(tags,ts) → virtualTags (warn 격리) ─ mergeAlarmTags → alarmTags = tags ⊕ virtualTags 2. alarmCaller.fireAlarms(alarmTags,ts) CompletableFuture×2 (공용 ForkJoinPool) 3. influxSaver.save(tags,ts) tags→Point(PROC/influxTags=tag, 값=field, WritePrecision.S) → writeApi.writePoints (error 격리) 4. cleaner.cleanup(dataKey) HDEL json_v1 <key> (warn 격리)
ℹ️ 성능 관찰
최신 경로 1회 = HLEN + HKEYS + HGET(1) + HDEL(1). HKEYS는 해시 전체 필드 스캔이라 적체 시 O(N) 비용이며 20ms 폴링과 결합 시 부하 요인. 병렬 스트림은 사용하지 않음(전부 순차).
⚠️ 설정 폴백 주의
운영 기본은 json_v1(yml)이나, 코드 기본값(matchIfMissing)은 legacy. collect.version 프로퍼티가 누락되면 legacy로 폴백된다.

🚨심층분석 ② 알람 프로세스

알람은 단일 태그다중 태그 두 계열이 거의 대칭 구조로 존재한다. 현재 application.alarm.use=false(yml)라 발화 경로 자체는 기본 비활성이나, 활성화 시 아래 생애주기로 동작한다.

구분단일 태그다중 태그
룰 도메인TagAlarmRuleMultiAlarmRule
인메모리 저장소TagRuleManagerMultiRuleManager
평가 서비스TagAlarmServiceMultiAlarmService
조건 소스upper/lower/exact 필드 → TagAlarmConditionFactoryDB expression 문자열 → AlarmExpressionParser

1) 인메모리 룰 셋 — tagId가 곧 식별자

Set<TagAlarmRule> tagRules       = Collections.newSetFromMap(new ConcurrentHashMap<>());
Set<TagAlarmRule> firedTagRules  = Collections.newSetFromMap(new ConcurrentHashMap<>());  // TagRuleManager:20-26
  • tagRules = 평가 대상 활성 룰 전체. firedTagRules = 이미 발화되어 억제 중인 룰의 clone() 사본.
  • equals/hashCodetagId 사용(TagAlarmRule.java:256-266) → tagId당 룰 1개. (Multi는 multiAlarmNo 기준)
  • 초기화 @PostConstruct init(): 재시작 잔여 deactivating 정리 → activated 룰 로드 → initialized=true. 이 플래그가 상태전이 스케줄의 조기 실행을 가드.

2) 표현식 DSL — 파싱 파이프라인

AlarmExpressionParser.parse()PreprocessorTokenizerRecursiveDescentParserCondition 트리.

expression      ::= term (OR term)*        // OR 최저 우선순위, 좌결합
term            ::= factor (AND factor)*   // AND > OR
factor          ::= singleCondition | '(' expression ')'   // 괄호 최고
singleCondition ::= TAG_ID  OP  (NUMBER | STRING)          // OP: == != > >= < <=
  • 전처리(Preprocessor): 단독 ===, 'ON'1, 'OFF'0(작은따옴표만).
  • 토큰화: {{...}}TAG_ID, "..."STRING, AND/OR은 대소문자 무시+경계 필요, 음수·소수 지원, 단독 =는 예외(전처리 전제).
  • {{TAG}}는 값 치환이 아니다 — 토큰에 tagId 문자열만 담고, 평가 시점에 tagValues.get(tagId)로 런타임 lookup(SingleCondition.java:44-49).
⚠️ 연산자 타입 규칙
==/!=는 숫자·문자 모두 지원하되 타입이 다르면 무조건 false. > >= < <=숫자(DoubleValue)만 — 문자열이 섞이면 UnsupportedOperationException. 이 예외가 상위 평가에서 legacy 폴백을 유발할 수 있음(아래 4).
🔎 예시 트레이스 {{TEMP}} > 80 AND {{PRESS}} >= 100
토큰:  TAG_ID(TEMP) GT(>) NUM(80) AND TAG_ID(PRESS) GE(>=) NUM(100) EOF
트리:  CompoundCondition(AND)
        ├ SingleCondition(TEMP,  >,  80.0)
        └ SingleCondition(PRESS, >=, 100.0)
평가:  {TEMP:85, PRESS:120} → 85>80(T) AND 120>=100(T) → true, winning=(PRESS>=100)
       {TEMP:85, PRESS:90}  → 첫 조건 T, 둘째 90>=100(F) → 단락평가로 즉시 false

3) 두 종류의 스케줄 — 혼동 금지

구분(A) 룰 라이프사이클(B) 값 평가·fire/clear
주기@Scheduled(fixedDelay=1000) (1초)파이프라인 20ms → fireAlarms
하는 일activating→룰 추가 / deactivating→룰 제거 (DB 폴링)실제 조건 평가로 발생/해제
스레드taskScheduler(풀 10)공용 ForkJoinPool

(A)는 "룰을 평가 대상에 넣고 빼는" 관리 작업일 뿐, fire/clear가 아니다.

4) fire / clear 전이 (값 평가)

핵심 evaluateAndFilterRulesWithAlarmValue(TagAlarmService.java:263-317) — rules.parallelStream():

  • 미발생→발생 addFiredRuleIfNew(:375-383): firedTagRules.contains면 억제(중복 차단), 아니면 clone()을 셋에 추가하고 신규 발화만 하류로 → occurred 로그 저장.
  • 발생→해제 resolveFiredTagRule(:385-395): 열린 로그를 resolved로 전환 후 셋에서 제거.
  • 디지털 이벤트성 예외: onToOff/offToOn 등은 자동 해제하지 않음(엣지 이벤트).
  • 억제/재활성: 매 실행 앞에서 removeInvalidFiredRules(now)canReactivate(suppression 경과, 기본 FOREVER) 룰을 firedTagRules에서 제거 → 재발화 허용.

발화 시 createAndSaveAlarmLogsoccurred 로그 saveAllpublishEvent(AlarmFiredEvent)(:539). 저장 실패는 catch만 하고 이벤트 미발행(조용히 유실).

5) 비동기 발화 · 알림 전송

// CommonAlarmCaller.java:37-38  (useAlarm=application.alarm.use, yml=false)
CompletableFuture.runAsync(() -> tagAlarmService.executeTagAlarm(tags, ts));
CompletableFuture.runAsync(() -> multiAlarmService.executeMultiAlarm(tags, ts));

스레드풀: runAsync(Runnable)는 Executor 미지정 → 공용 ForkJoinPool.commonPool(). 내부 parallelStream도 commonPool → 평가 태스크와 스트림이 같은 풀 공유. fire-and-forget이라 예외는 삼켜짐.

알림 AlarmNotificationEventListener:

실행@Async("alarmEventExecutor") + @TransactionalEventListener(AFTER_COMMIT, fallbackExecution=true)
대상RestClient POST {alarm.notification.url}/api/v1/alerts/notify (yml 10.123.202.122:8080)
페이로드{"alarms":[{alarmType, alarmLogNo}, ...]}
재시도없음 (catch 후 log.error만)
🔴 정정 — 타임아웃이 실제로는 미적용
클래스에 CONNECTION/READ_TIMEOUT=3000 상수와 createRequestFactory()가 있으나, 생성자에서 그 팩토리 사용 라인이 주석 처리되고 new SimpleClientHttpRequestFactory()를 그대로 쓴다 → 타임아웃 미설정(사실상 무한 대기). 알림 서버가 지연되면 alarmEventExecutor(max 10/queue 100/CallerRunsPolicy) 스레드가 묶이고, 큐 초과 시 커밋 직후 스레드가 대신 실행되어 지연이 전파될 수 있다. (요약 §4·§7의 "타임아웃 3s"는 상수 존재 기준 표기 — 실제 적용은 이 정정을 따른다.)

6) REST API · 생애주기 시퀀스

컨트롤러엔드포인트
TagAlarmRuleController
/api/v1/alarm/tag-alarm
POST /reactivate(firedTagRules 제거), GET /statusregister/unregister 없음(DB status 폴링으로 반영)
MultiAlarmRuleController
/api/v1/alarm/multi-alarm
POST /(register), DELETE /?tid=, POST /reactivate, GET /status
[부팅] @PostConstruct init(): deactivating 정리 → activated 로드 → initialized=true [룰 관리] @Scheduled(1s): activating→룰 추가 / deactivating→룰 제거 [값 평가] @Scheduled(20ms) → fireAlarms (useAlarm=false면 skip) CompletableFuture.runAsync → [ForkJoinPool.commonPool] removeInvalidFiredRules(now) // 억제 경과분 해제 rules.parallelStream(): evaluate → firing&신규 ? firedTagRules.add(clone)→occurred 로그 firing&기존 ? 억제(skip) !firing ? resolveFiredTagRule(resolved) publishEvent(AlarmFiredEvent) [알림] AFTER_COMMIT + @Async("alarmEventExecutor") → RestClient POST /api/v1/alerts/notify {"alarms":[...]} (타임아웃 미적용 / 재시도 없음 / 실패 log.error)

7) 동시성 주의점 (요약)

#항목
1commonPool 미격리·무역압: fireAlarmsrunAsync×2와 내부 parallelStream이 전부 공용 ForkJoinPool. 20ms 폴링이 fire-and-forget 제출 → 평가가 20ms 초과 시 태스크 누적.
2셋 복합연산 비원자성: 상태전이(1s)의 addAll/removeAll과 평가의 contains→add(addFiredRuleIfNew)가 같은 셋을 동시 수정. ConcurrentHashMap이라 개별 연산은 안전하나 복합 연산은 비원자적 → 짧은 창에서 중복 fire/해제 교차 여지 (추정).
3룰 객체 가변 상태: currentValue/lastValue/firedTime이 평가 중 뮤테이트. 20ms 겹침 실행 시 상태 시퀀스 엉킴 가능. firedTagRulesclone()이라 로그 스냅샷은 보존 (추정).
4폴백 재평가 부작용: AlarmValue 경로가 예외 시 legacy 전체 재실행 → 예외 이전 firedTagRules 부분 변경과 겹칠 수 있음(멱등성 미보장, 추정).