DM Log

[Modbus] #1 Modbus 소개 및 Modbus TCP test 통신 구축 본문

개발공부/Data Collection

[Modbus] #1 Modbus 소개 및 Modbus TCP test 통신 구축

Dev. Dong 2025. 2. 8. 17:55

Modbus 소개 및 Modbus TCP test 통신 구축


Modbus란?

PLC와 함께 사용하기 위한 데이터 통신 프로토콜으로, 산업용 전자 장치를 연결하는 방법이다. 프로토콜 전송 계층으로 케이블이나 이더넷 네트워크 통신을 지원한다. Modbus 명령을 통해 Coil과 Holding register에 포함된 값 변경, I/O 포트 읽기, Coil과 Holding 레지스터에 포함된 값을 다른 장치에 보내지도록 설정 가능하다.

 

Modbus 객체 유형

객체 유형 접근 주소 공간
Coil Read-Write 00001 - 09999
Discrete input Read-only 10001 - 19999
Input register Read-only 30001 - 39999
Holding register Read-Write 40001 - 49999

 

Modbus RTU 와 TCP 방식 비교

항목 Modbus RTU Modbus TCP
통신 방식 직렬 통신 (RS-232, RS-485) 이더넷 (Ethernet, TCP/IP)
전송 프로토콜 RTU 형식의 데이터 전송  TCP/IP 패킷을 통해 데이터 전송
테이터 구조 CRC 기반 프레임 구조 (Start → Address →
Function → Data → CRC → Stop)
TCP/IP 기반 (MBAP 헤더 포함)
네트워크 구조 마스터-슬레이브 (1:N 구조) 클라이언트-서버 (N:N 가능)

Modbus TCP test 통신 구축

Python Modbus 라이브러리를 사용하여 test 용 Modbus TCP 슬레이브 (데이터를 전송하거나 상태값을 제공) 구축

개발 진행 방향

- pymodbus 2.5.3 라이브러리 사용

- Coil 0번 주소에 3초마다 설비가 동작 유무에 대한 데이터(0 / 1)를 랜덤으로 변경하여 프로토콜 전송

- Coil 1번주소에 3초마다 알람 발생 유무에 대한 데이터(0 / 1)를 랜덤으로 변경하여 프로토콜 전송

 

# python 3.8.10 / pymodbus 2.5.3

import random
import time
import threading
from pymodbus.server.sync import StartTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext

# 초기 레지스터 값 설정
store = ModbusSlaveContext(
    di=ModbusSequentialDataBlock(0, [0] * 100), # Discrete Inputs
    co=ModbusSequentialDataBlock(0, [0] * 2),  # Coils
    hr=ModbusSequentialDataBlock(0, [0] * 100),  # Holding Registers
    ir=ModbusSequentialDataBlock(0, [0] * 100)  # Input Registers
)

context = ModbusServerContext(slaves=store, single=True)

# 랜덤 데이터 업데이트 함수
def update_modbus_data():
    while True:
        # Coils(알람 값) 0번 주소에 True 또는 False 랜덤 저장
        alarm_value = random.choice([0, 1])
        store.setValues(1, 0, [alarm_value])
        # Coils(동작 값) 0번 주소에 True 또는 False 랜덤 저장
        run_value = random.choice([0, 1])
        store.setValues(1, 1, [run_value])

        print(f"[MODBUS UPDATE] Alarm: {alarm_value}, Value: {run_value}")
        time.sleep(3) 

threading.Thread(target=update_modbus_data, daemon=True).start()

# MODBUS TCP 서버 실행
print("Starting MODBUS TCP Server on port 502...")
StartTcpServer(context, address=("0.0.0.0", 502))