IT 개발,관리,연동,자동화

NestJS를 활용한 MQTT, Modbus 기반 실시간 데이터 수집 및 처리 시스템 구축 가이드

_Blue_Sky_ 2025. 12. 18. 20:46
728x90

 

1. 시스템 아키텍처 개요

이 시스템은 다양한 환경(공장, 건물 등)에서 발생하는 계량기 데이터를 수집하고 중앙 서버에 통합하는 구조를 가집니다.

  • Data Source Layer: 전력, 가스, 수도 계량기 (MQTT 기반 무선 센서 또는 Modbus RTU/TCP 유선 장치).
  • Ingestion Layer (NestJS):
    • MQTT: 이벤트 기반 수신 (Push 방식).
    • Modbus: 스케줄링 기반 폴링 (Pull 방식).
  • Storage Layer: Prisma ORM을 통해 MySQL에 시계열 데이터 저장.
  • Presentation Layer: REST API 및 WebSockets을 통해 대시보드(React/Vue)에 실시간 데이터 전달.

2. 세부 구현 가이드

① 프로젝트 기반 설정

NestJS 기본 프로젝트 설치 및 데이터 모델링을 수행합니다.

# 1. NestJS CLI 설치 및 프로젝트 생성
npm i -g @nestjs/cli
nest new energy-monitor-system

# 2. 필수 라이브러리 설치
# mqtt: MQTT 통신, modbus-serial: Modbus 통신, prisma: ORM
npm install @prisma/client mqtt modbus-serial @nestjs/schedule
npm install -D prisma

Prisma Schema (prisma/schema.prisma):

데이터의 일관성을 위해 센서 타입과 위치 등을 구분할 수 있도록 모델을 설계합니다.

model MeterData {
  id        Int      @id @default(autoincrement())
  deviceType String   // 예: 'ELECTRIC', 'WATER', 'GAS'
  value     Float    // 계측값
  topic     String   // 소스 식별 (MQTT Topic 또는 Modbus Address)
  timestamp DateTime @default(now())

  @@index([timestamp]) // 조회 성능을 위한 인덱스
}

② 데이터 수집 서비스 구현

[MQTT] 실시간 이벤트 수신

MQTT는 브로커(Mosquitto 등)를 통해 데이터가 들어올 때마다 즉시 반응합니다.

// src/mqtt/mqtt.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import * as mqtt from 'mqtt';
import { PrismaService } from '../prisma/prisma.service';

@Injectable()
export class MqttService implements OnModuleInit {
  private client: mqtt.MqttClient;

  constructor(private prisma: PrismaService) {}

  onModuleInit() {
    this.client = mqtt.connect('mqtt://broker.hivemq.com'); // 브로커 주소

    this.client.on('connect', () => {
      this.client.subscribe('energy/meter/#'); // 와일드카드 구독
      console.log('✅ MQTT Connected and Subscribed');
    });

    this.client.on('message', async (topic, message) => {
      try {
        const payload = JSON.parse(message.toString());
        await this.prisma.meterData.create({
          data: {
            deviceType: 'MQTT_SENSOR',
            value: payload.value,
            topic: topic,
          },
        });
      } catch (err) {
        console.error('MQTT Parsing Error:', err.message);
      }
    });
  }
}

[Modbus] 주기적 데이터 폴링

Modbus는 서버가 장치에 "데이터를 달라"고 요청해야 합니다. @nestjs/schedule을 사용하여 구현합니다.

// src/modbus/modbus.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import * as ModbusRTU from 'modbus-serial';
import { PrismaService } from '../prisma/prisma.service';
import { Cron, CronExpression } from '@nestjs/schedule';

@Injectable()
export class ModbusService implements OnModuleInit {
  private client = new ModbusRTU();

  constructor(private prisma: PrismaService) {}

  async onModuleInit() {
    // PLC 또는 게이트웨이 IP 설정
    await this.client.connectTCP('192.168.1.100', { port: 502 });
    this.client.setID(1); // 슬레이브 ID 설정
    console.log('✅ Modbus TCP Connected');
  }

  @Cron(CronExpression.EVERY_10_SECONDS)
  async pollData() {
    try {
      // 0번 레지스터부터 2개 읽기 (Holding Registers)
      const data = await this.client.readHoldingRegisters(0, 2);
      await this.prisma.meterData.create({
        data: {
          deviceType: 'MODBUS_DEVICE',
          value: data.data[0],
          topic: 'register/0',
        },
      });
    } catch (e) {
      console.error('Modbus Polling Failed', e);
    }
  }
}

③ 데이터 조회 API (REST)

프론트엔드 대시보드에서 최근 트렌드를 확인할 수 있는 엔드포인트를 제공합니다.

// src/data/data.controller.ts
@Controller('api/v1/meter')
export class MeterController {
  constructor(private readonly prisma: PrismaService) {}

  @Get('recent')
  async getRecentData() {
    return this.prisma.meterData.findMany({
      take: 50,
      orderBy: { timestamp: 'desc' },
    });
  }
}

3. 시스템 고도화 방안 (실무 팁)

구분 전략
실시간성 REST API 폴링 대신 Socket.io를 사용하여 서버에서 브라우저로 데이터를 즉시 Push합니다.
데이터 압축 데이터가 너무 많아질 경우 TimescaleDBInfluxDB 같은 시계열 DB 사용을 검토합니다.
장애 조치 통신 단절 시 데이터 유실 방지를 위해 Redis Pub/Sub이나 메시지 큐를 완충 지대로 활용합니다.
유효성 검사 class-validator를 사용하여 수신된 센서 데이터가 정상 범위 내에 있는지 체크합니다.

4. 결론

이 구조는 NestJS의 강력한 의존성 주입(DI) 덕분에 새로운 프로토콜(예: BACnet, OPC-UA)이 추가되더라도 기존 코드를 크게 수정하지 않고 모듈만 추가하여 확장할 수 있는 장점이 있습니다.


NestJS를 기반으로 1) Socket.io를 이용한 실시간 데이터 푸시2) 특정 수치 초과 시 슬랙(Slack) 알림 기능을 추가하여 시스템을 완성해 보겠습니다.


1. 실시간 데이터 푸시 (Socket.io)

사용자가 대시보드를 새로고침하지 않아도, 데이터가 수집되는 즉시 화면에 업데이트되도록 웹소켓을 설정합니다.

패키지 설치:

npm install @nestjs/websockets @nestjs/platform-socket.io socket.io

Gateway 구현 (src/events/events.gateway.ts):

import { WebSocketGateway, WebSocketServer } from '@nestjs/websockets';
import { Server } from 'socket.io';

@WebSocketGateway({
  cors: { origin: '*' }, // 프론트엔드 주소에 맞춰 설정
})
export class EventsGateway {
  @WebSocketServer()
  server: Server;

  // 새로운 데이터를 클라이언트로 전송하는 메서드
  broadcastMeterData(data: any) {
    this.server.emit('new_meter_data', data);
  }
}

2. 이상치 감지 및 슬랙 알림 (Slack Webhook)

데이터를 저장할 때 수치를 확인하여, 임계치(예: 100)를 넘으면 관리자에게 알림을 보냅니다.

패키지 설치:

npm install @slack/webhook

Alert 서비스 구현 (src/common/alert.service.ts):

import { Injectable } from '@nestjs/common';
import { IncomingWebhook } from '@slack/webhook';

@Injectable()
export class AlertService {
  private webhook: IncomingWebhook;

  constructor() {
    // 슬랙에서 생성한 Webhook URL을 넣습니다.
    this.webhook = new IncomingWebhook('https://hooks.slack.com/services/YOUR/WEBHOOK/URL');
  }

  async sendSlackAlert(topic: string, value: number) {
    await this.webhook.send({
      text: `🚨 *이상 데이터 감지!*`,
      attachments: [{
        color: 'danger',
        fields: [
          { title: '장치/토픽', value: topic, short: true },
          { title: '현재 수치', value: value.toString(), short: true },
          { title: '발생 시간', value: new Date().toLocaleString(), short: false },
        ],
      }],
    });
  }
}

3. 최종 서비스 로직 통합 (MqttService 수정)

데이터가 들어왔을 때 [DB 저장 → 소켓 전송 → 슬랙 알림 체크] 프로세스를 한 번에 처리합니다.

// src/mqtt/mqtt.service.ts (주요 로직)
import { Injectable } from '@nestjs/common';
import { EventsGateway } from '../events/events.gateway';
import { AlertService } from '../common/alert.service';
import { PrismaService } from '../prisma/prisma.service';

@Injectable()
export class MqttService {
  constructor(
    private prisma: PrismaService,
    private eventsGateway: EventsGateway, // 소켓 게이트웨이 주입
    private alertService: AlertService    // 알림 서비스 주입
  ) {
    // ... MQTT 연결 설정 (생략) ...

    this.client.on('message', async (topic, message) => {
      const payload = JSON.parse(message.toString());
      const value = payload.value;

      // 1. 데이터베이스 저장
      const newData = await this.prisma.meterData.create({
        data: { value, topic, deviceType: 'MQTT' }
      });

      // 2. 웹소켓으로 실시간 브로드캐스트
      this.eventsGateway.broadcastMeterData(newData);

      // 3. 임계치 체크 (예: 100 초과 시 슬랙 알림)
      if (value > 100) {
        await this.alertService.sendSlackAlert(topic, value);
      }
    });
  }
}

4. 프론트엔드 연동 예시 (React)

클라이언트에서 실시간으로 데이터를 수신하는 방법입니다.

import { useEffect, useState } from 'react';
import { io } from 'socket.io-client';

const socket = io('http://localhost:3000'); // 서버 주소

function Dashboard() {
  const [meterList, setMeterList] = useState([]);

  useEffect(() => {
    socket.on('new_meter_data', (newData) => {
      console.log('실시간 데이터 도착:', newData);
      setMeterList((prev) => [newData, ...prev].slice(0, 20)); // 최신 20개 유지
    });

    return () => socket.off('new_meter_data');
  }, []);

  return (
    <div>
      <h2>실시간 계량기 현황</h2>
      <ul>
        {meterList.map(item => (
          <li key={item.id}>{item.topic}: {item.value} (수신시간: {new Date(item.timestamp).toLocaleTimeString()})</li>
        ))}
      </ul>
    </div>
  );
}

요약 및 장점

  1. 실시간성 확보: Socket.io를 통해 5초 폴링 방식보다 훨씬 빠른 밀리초 단위의 실시간성을 구현했습니다.
  2. 사고 예방: AlertService를 분리하여 데이터 수집 즉시 슬랙으로 알림을 보내므로, 관리자가 현장 문제를 즉각 인지할 수 있습니다.
  3. 결합도 낮춤: 각 기능을 별도의 서비스(Gateway, AlertService)로 분리하여 코드의 유지보수성을 높였습니다.

Nuxt.js(Nuxt 3)는 Vue.js 기반의 강력한 프레임워크로, 실시간 데이터를 처리하는 대시보드 UI를 구축하기에 최적입니다. 앞서 만든 NestJS 서버와 연동하여 실시간 차트와 데이터 리스트를 보여주는 UI를 구현해 보겠습니다.


1. 프로젝트 설정 및 라이브러리 설치

먼저 Nuxt 3 프로젝트를 생성하고, 실시간 통신을 위한 socket.io-client와 차트 시각화를 위한 chart.js 관련 라이브러리를 설치합니다.

npx nuxi@latest init energy-dashboard
cd energy-dashboard
npm install
npm install socket.io-client chart.js vue-chartjs

2. 실시간 데이터 관리를 위한 composable 작성

서버와의 소켓 연결을 관리하는 로직을 별도로 분리하여 재사용 가능하게 만듭니다.

composables/useSocket.ts:

import { io, Socket } from 'socket.io-client';
import { ref, onBeforeUnmount } from 'vue';

export const useSocket = () => {
  const socket = ref<Socket | null>(null);
  const realTimeData = ref<any[]>([]);

  const connect = () => {
    socket.value = io('http://localhost:3000'); // NestJS 서버 주소

    socket.value.on('new_meter_data', (data) => {
      // 최신 데이터를 배열 앞부분에 추가 (최대 10개 유지)
      realTimeData.value = [data, ...realTimeData.value].slice(0, 10);
    });
  };

  const disconnect = () => {
    if (socket.value) socket.value.disconnect();
  };

  return { socket, realTimeData, connect, disconnect };
};

3. 메인 대시보드 UI 구현

app.vue에서 차트와 실시간 리스트를 결합한 UI를 구성합니다.

 

IoT 실시간 에너지 모니터링

MQTT & Modbus 데이터 실시간 수신 중

전력 소비 트렌드

최근 수신 데이터

토픽/소스 측정값 시간
{{ item.topic }} {{ item.value }} kWh {{ new Date(item.timestamp).toLocaleTimeString() }}
import { Line as LineChart } from 'vue-chartjs'
import { Chart as ChartJS, Title, Tooltip, Legend, LineElement, CategoryScale, LinearScale, PointElement } from 'chart.js'

ChartJS.register(Title, Tooltip, Legend, LineElement, CategoryScale, LinearScale, PointElement)

const { realTimeData, connect, disconnect } = useSocket();

// 차트 데이터 계산 (최신 10개 데이터를 시각화)
const chartData = computed(() => ({
  labels: realTimeData.value.map(d => new Date(d.timestamp).toLocaleTimeString()).reverse(),
  datasets: [{
    label: '에너지 측정값',
    backgroundColor: '#3b82f6',
    borderColor: '#3b82f6',
    data: realTimeData.value.map(d => d.value).reverse(),
    tension: 0.4
  }]
}))

const chartOptions = { responsive: true, maintainAspectRatio: false }

onMounted(() => connect());
onBeforeUnmount(() => disconnect());

4. 핵심 기능 요약

  1. Reactive UI: computed 속성을 사용하여 realTimeData가 소켓을 통해 업데이트될 때마다 차트가 자동으로 다시 그려집니다.
  2. Clean Architecture: useSocket이라는 Composable을 통해 비즈니스 로직(통신)과 UI(컴포넌트)를 분리했습니다.
  3. User Experience: 데이터 테이블에 hover 효과와 time-string 포맷팅을 적용하여 가독성을 높였습니다.

대시보드의 완성도를 높이기 위해 1) 날짜별 히스토리 조회 페이지2) 여러 장치를 한눈에 비교하는 카드 UI를 Nuxt 3로 구현해 보겠습니다.


1. 여러 장치 비교를 위한 멀티 카드 UI

대시보드 상단에 위치하여 현재 활성화된 장치들의 실시간 상태(전기, 가스, 수도 등)를 요약해서 보여줍니다.

components/DeviceStatusCard.vue:

<template>
  <div class="grid grid-cols-1 md:grid-cols-3 gap-4 mb-8">
    <div v-for="device in devices" :key="device.name" 
         class="bg-white p-5 rounded-xl shadow-sm border-l-4" 
         :class="device.color">
      <div class="flex justify-between items-center">
        <span class="text-sm font-medium text-gray-500">{{ device.name }}</span>
        <span class="text-xs px-2 py-1 rounded-full bg-green-100 text-green-700">Active</span>
      </div>
      <div class="mt-2 flex items-baseline">
        <span class="text-2xl font-bold text-gray-800">{{ device.value }}</span>
        <span class="ml-1 text-sm text-gray-500">{{ device.unit }}</span>
      </div>
    </div>
  </div>
</template>

<script setup>
const props = defineProps(['realTimeData']);

const devices = computed(() => [
  { name: '전력 (MQTT)', unit: 'kWh', color: 'border-blue-500', 
    value: props.realTimeData.find(d => d.topic.includes('energy'))?.value || 0 },
  { name: '가스 (Modbus)', unit: 'm³', color: 'border-orange-500', 
    value: props.realTimeData.find(d => d.topic.includes('gas'))?.value || 0 },
  { name: '수도 (Modbus)', unit: 'L', color: 'border-cyan-500', 
    value: props.realTimeData.find(d => d.topic.includes('water'))?.value || 0 },
]);
</script>


2. 히스토리 조회 페이지 (기간별 검색)

과거 데이터를 API로 요청하여 테이블과 차트로 보여주는 별도 페이지입니다.

pages/history.vue:

<template>
  <div class="p-8">
    <div class="flex justify-between items-center mb-6">
      <h1 class="text-2xl font-bold">데이터 이력 조회</h1>
      <NuxtLink to="/" class="text-blue-500 hover:underline">실시간 대시보드로 돌아가기</NuxtLink>
    </div>

    <div class="bg-white p-4 rounded-lg shadow-sm mb-6 flex gap-4 items-end">
      <div>
        <label class="block text-sm mb-1">시작일</label>
        <input type="date" v-model="filter.start" class="border p-2 rounded" />
      </div>
      <div>
        <label class="block text-sm mb-1">종료일</label>
        <input type="date" v-model="filter.end" class="border p-2 rounded" />
      </div>
      <button @click="fetchHistory" class="bg-blue-600 text-white px-6 py-2 rounded hover:bg-blue-700">
        조회하기
      </button>
    </div>

    <div class="bg-white rounded-lg shadow overflow-hidden">
      <table class="w-full">
        <thead class="bg-gray-50">
          <tr>
            <th class="p-3 text-left">시간</th>
            <th class="p-3 text-left">장치 토픽</th>
            <th class="p-3 text-left">측정값</th>
          </tr>
        </thead>
        <tbody>
          <tr v-for="item in historyData" :key="item.id" class="border-t">
            <td class="p-3">{{ new Date(item.timestamp).toLocaleString() }}</td>
            <td class="p-3 text-gray-600">{{ item.topic }}</td>
            <td class="p-3 font-semibold">{{ item.value }}</td>
          </tr>
          <tr v-if="historyData.length === 0">
            <td colspan="3" class="p-10 text-center text-gray-400">조회된 데이터가 없습니다.</td>
          </tr>
        </tbody>
      </table>
    </div>
  </div>
</template>

<script setup>
const filter = ref({ start: '', end: '' });
const historyData = ref([]);

const fetchHistory = async () => {
  // NestJS 서버의 API 호출 (Prisma findMany 사용 엔드포인트)
  const { data } = await useFetch(`http://localhost:3000/api/v1/meter/history`, {
    query: { start: filter.value.start, end: filter.value.end }
  });
  if (data.value) historyData.value = data.value;
};
</script>

3. NestJS 백엔드 추가 (히스토리 API)

히스토리 조회를 위해 백엔드 컨트롤러에 기간 필터링 로직을 추가해야 합니다.

src/data/data.controller.ts:

@Get('history')
async getHistory(
  @Query('start') start: string,
  @Query('end') end: string
) {
  return this.prisma.meterData.findMany({
    where: {
      timestamp: {
        gte: start ? new Date(start) : undefined,
        lte: end ? new Date(end) : undefined,
      },
    },
    orderBy: { timestamp: 'desc' },
  });
}

요약 및 활용

  • 멀티 카드 UI: 장비별 현재 상태를 직관적으로 파악할 수 있습니다. (메인 화면용)
  • 히스토리 페이지: 과거 특정 시점의 장애 발생이나 사용량 패턴을 분석할 때 필수적입니다.
  • 컴포넌트 구조: Nuxt의 components/와 pages/ 구조를 활용해 대규모 프로젝트로 확장 가능한 형태를 갖췄습니다.
728x90