AWS IoT Core Device Shadow를 알아보자
AWS IoT Core에서 빠질 수 없는 Device Shadow에 대해 알아보고 작성자 본인이 디바이스 섀도를 적용하기 위해 수행했던 각종 경험담과 실습 코드를 통해 디바이스 섀도를 이해해 보겠다.
AWS IoT Core Device Shadow
AWS IoT Core Device Shadow
1. AWS IoT Core Device Shadow란?
AWS IoT Device Shadow 서비스는 AWS IoT 사물 객체에 그림자를 추가합니다. 섀도는 장치가 연결되어 있는지 여부에 관계없이 앱 및 기타 서비스에서 장치의 상태를 사용할 수 있도록 할 수 AWS IoT 있습니다. AWS IoT 사물 객체에는 명명된 그림자가 여러 개 있을 수 있으므로 IoT 솔루션에서 장치를 다른 앱 및 서비스에 연결하는 데 사용할 수 있는 더 많은 옵션을 제공할 수 있습니다.
AWS IoT Device Shadow 서비스 - AWS IoT Core
업데이트 요청 권한은 신뢰할 수 있는 앱 및 디바이스로 제한되어야 합니다. 이렇게 하면 섀도우의 상태 속성이 예기치 않게 변경되는 것을 방지할 수 있습니다. 그렇지 않으면 섀도우를 사용하
docs.aws.amazon.com
위 내용은 AWS IoT Core 개발자 안내서에서 소개하는 Device Shadow에 대한 내용이다. 한국어로 번역이 되어 있는데 영어를 번역한 거라 어색한 느낌이 든다.
내가 Deivce Shadow를 검토하게 된 배경이 있다. 회사에서 구현하려는 IoT 기기의 경우 수면 환경을 센싱 하여 센서 데이터를 클라우드 서버로 전송하여 DB에 저장하겠다는 것이었는데 최초 논의에서는 단순히 데이터만 저장하겠다는 요구사항이었기 때문에 AWS IoT Core를 도입하지 않고 그냥 기존의 API서버를 사용하겠다는 생각을 가지고 개발을 진행했다.
하지만 개발자의 숙명일까? 프로젝트가 한참 진행 중일 때 윗선의 개입 혹은 툭하고 던진 아이디어로 인해 요구사항이 급격하게 변화하는 경험을 했다. 앱을 통해서 IoT 기기의 조명과 같은 특정 기능의 값을 변경할 수 있었으면 한다는 요구가 있었다. 이 하나의 기능으로 인하여 급격하게 개발 포인트가 변하게 되었다. 기존 개발에서는 IoT 기기가 HTTP통신을 통해 데이터를 보내는 것으로 가닥을 잡고 펌웨어 개발자와 개발방향을 잡았다. 하지만 기기의 조작이 들어가는 순간 HTTP통신을 사용하겠다는 방법이 문제가 되었다. IoT 기기로부터 일방적으로 보내지는 데이터를 저장만 하면 되던 최초 기획에서 이제는 클라우드 서버를 통한 기기의 제어와 연관이 된 순간이었고 이때 AWS IoT Core를 적용하는 것으로 결정을 했다.
처음에는 Device Shadow에 대해서 구현을 하는 것이 막막했다. AWS IoT Core가 다른 AWS의 서비스에 비해 자주 언급되는 서비스는 아니라 Device Shadow에 대한 레퍼런스가 부족했고 사실 내가 Device Shadow에 대해서 이해를 한 것이 아니었다. 물론 지금도 잘 못 이해하고 있을 수 있다.
그래서 내가 이해하고 있는 디바이스 섀도를 설명하자면 디바이스 섀도는 IoT 기기의 상태를 따라 하는 그림자이다. 진짜 말 그대로 그림자인 것이다. 다른 설명으로는 기기의 상태를 확인하고 조작하기 위한 가상의 기기이다.
그렇다 결국은 Device Shadow는 가상기기이다. 나는 Device Shadow를 구현하면서 Device Shadow가 가상기기라는 것을 인지하지 못했다. 그러니 분명 변경 명령을 내렸는데 상태가 변하지 않는 것을 보고 문제가 있다고 생각을 한 것이다. 사실 MQTT 토픽으로 보면 이미 명령이 보내져 가상기기의 상태는 변경되었지만 이를 수신받는 IoT 기기에서 이를 제대로 수신받지 못하고 또한 처리하지 못하고 있었는데도 말이다.
2. Device Shadow를 적용해 보자
이제 실제 Device Shadow를 적용해 보겠다. 실습 환경으로는 윈도우의 경우 WSL환경에 설치된 리눅스에서 맥북이라면 그냥 진행해도 무방하다.
아래는 내가 Device Shadow의 흐름을 이해할 수 있도록 도와주었던 이미지를 가져와 봤다.
Device Shadows - MQTT Topics :: IoT Atlas
Device Shadows - MQTT Topics Device Shadow is a service of AWS IoT Core. Shadows can make a device’s state available via MQTT topics whether the device is connected or not. This allows Apps and Services to interact with the device state at any time. Use
iotatlas.net
먼저 크게 보자면 연결을 진행하는 부분과 업데이트 명령을 처리하는 부분으로 볼 수 있다. 이 부분은 코드를 보면서 설명하겠다. 전체 코드를 보자
from time import sleep
from awscrt import io, mqtt, exceptions
from awsiot import iotshadow, mqtt_connection_builder
from concurrent.futures import Future
import json
import sys
import os
from threading import Thread
import logging
import traceback
from random import randint
from datetime import datetime
from uuid import uuid4
endpoint = <개인의 AWS IoT Core 엔드포인트 주소>
cert = <AWS IoT Core에 등록된 인증서>
key = <AWS IoT Core에 등록된 private 키>
root_ca = <AWS Root CA>
client_id = <client 이름(나는 thing_name과 같이 사용했다.)>
thing_name = <AWS IoT Core에 등록된 IoT Thing Name>
shadow_property = "color"
SHADOW_VALUE_DEFAULT = "red"
device_state_file = "device_state.json"
device_id = 1
user_id = 1
def read_device_state():
if os.path.exists(device_state_file):
with open(device_state_file, 'r') as file:
return json.load(file)
return {shadow_property: SHADOW_VALUE_DEFAULT}
def write_device_state(state):
with open(device_state_file, 'w') as file:
json.dump(state, file)
def exit_program(msg_or_exception):
if isinstance(msg_or_exception, Exception):
logger.error("예외로 인해 종료됨", exc_info=msg_or_exception)
else:
logger.info(f"종료됨: {msg_or_exception}")
if mqtt_connection:
mqtt_connection.disconnect().result()
sys.exit()
# 파일이 존재하고 읽을 수 있는지 확인하는 함수
def check_file_access(file_path):
if os.path.exists(file_path):
if os.access(file_path, os.R_OK):
print(f"파일 '{file_path}' 존재하며 읽기 가능합니다.")
else:
print(f"파일 '{file_path}' 존재하지만 읽기 권한이 없습니다.")
else:
print(f"파일 '{file_path}' 존재하지 않습니다.")
# 각 파일에 대해 체크
check_file_access(cert)
check_file_access(key)
check_file_access(root_ca)
# 로깅 설정
io.init_logging(getattr(io.LogLevel, io.LogLevel.Debug.name), 'stderr')
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 일반 SDK 로깅
logger = logging.getLogger("mqtt_client")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(handler)
# 연결 콜백
def on_connection_interrupted(connection, error, **kwargs):
logger.warning(f"연결 중단됨: {error}")
def on_connection_resumed(connection, return_code, session_present, **kwargs):
logger.info(f"연결 재개됨: {return_code}")
# 재연결 시 섀도우 상태 다시 요청
request_shadow_state()
def on_get_shadow_accepted(response):
try:
logger.info("섀도우 상태를 성공적으로 가져왔습니다.")
if response.state:
if response.state.desired and shadow_property in response.state.desired:
desired_value = response.state.desired[shadow_property]
logger.info(f"기기가 시작될 때 원하는 상태는 '{desired_value}' 입니다.")
update_device_state(desired_value)
elif response.state.reported and shadow_property in response.state.reported:
reported_value = response.state.reported[shadow_property]
logger.info(f"기기가 시작될 때 보고된 상태는 '{reported_value}' 입니다.")
update_device_state(reported_value)
else:
logger.info("섀도우 상태에서 적용할 수 있는 값이 없습니다.")
else:
update_device_state(SHADOW_VALUE_DEFAULT)
except Exception as e:
exit_program(e)
def on_get_shadow_rejected(error):
logger.error(f"섀도우 상태를 가져오는 데 실패했습니다: code={error.code}, message={error.message}")
if error.code == 404:
update_device_state(SHADOW_VALUE_DEFAULT)
publish_shadow_update(SHADOW_VALUE_DEFAULT)
else:
exit_program(f"Get 요청이 거부되었습니다: code={error.code}, message={error.message}")
def on_shadow_delta_updated(delta):
try:
logger.info("Delta 업데이트 수신")
if delta.state and shadow_property in delta.state:
shadow_value = delta.state[shadow_property]
update_device_state(shadow_value)
except Exception as e:
exit_program(e)
def on_update_shadow_accepted(response):
logger.info("섀도우 업데이트가 수락되었습니다.")
def on_update_shadow_rejected(error):
logger.error(f"업데이트 요청이 거부되었습니다: code={error.code}, message={error.message}")
def update_device_state(value):
local_state = read_device_state()
if local_state[shadow_property] != value:
local_state[shadow_property] = value
write_device_state(local_state)
logger.info(f"디바이스 상태를 {value}로 업데이트했습니다.")
publish_shadow_update(value)
def publish_shadow_update(value):
token = str(uuid4())
request = iotshadow.UpdateShadowRequest(
thing_name=thing_name,
state=iotshadow.ShadowState(
reported={shadow_property: value},
desired=None # desired 상태는 설정하지 않음 (기기의 현재 상태만 보고)
),
client_token=token,
)
shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE)
def request_shadow_state():
logger.info("섀도우 상태 요청")
try:
publish_get_future = shadow_client.publish_get_shadow(
request=iotshadow.GetShadowRequest(thing_name=thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE
)
publish_get_future.result()
except Exception as e:
exit_program(e)
# 센서 데이터를 주기적으로 전송하는 함수
def publish_sensor_data():
while True:
t = datetime.utcnow()
time_in_seconds = int((t - datetime(1970, 1, 1)).total_seconds())
sensor_data = {
"device_id": device_id,
"user_id": user_id,
"temperature": randint(-20, 50),
"humidity": randint(0, 100),
"noise": randint(30, 100),
"light": randint(0, 1000),
"TimeStamp": time_in_seconds
}
topic = "basestation/sensor/data"
mqtt_connection.publish(
topic=topic,
payload=json.dumps(sensor_data),
qos=mqtt.QoS.AT_LEAST_ONCE
)
logger.info(f"센서 데이터 게시: {sensor_data}")
sleep(5)
# MQTT 클라이언트 생성
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
will_message = dict(
status="disconnected",
device_id=device_id,
user_id=user_id
)
if __name__ == '__main__':
local_state = read_device_state()
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=endpoint,
cert_filepath=cert,
pri_key_filepath=key,
ca_filepath=root_ca,
client_id=client_id,
client_bootstrap=client_bootstrap,
clean_session=False,
keep_alive_secs=30,
will=mqtt.Will(
topic="device/status",
qos=mqtt.QoS.AT_LEAST_ONCE,
payload=json.dumps(will_message).encode('utf-8'),
retain=False
),
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
)
# 디바이스 섀도우 클라이언트 생성
shadow_client = iotshadow.IotShadowClient(mqtt_connection)
if not os.path.exists(device_state_file):
initial_state = {shadow_property: SHADOW_VALUE_DEFAULT}
write_device_state(initial_state)
try:
# MQTT 연결
logger.debug("MQTT 연결 시도 중...")
connect_future = mqtt_connection.connect()
connect_future.result()
logger.info("MQTT 연결 성공!")
# 연결 성공 메시지 게시
connect_success_topic = "device/connected"
connect_success_message = {
"device_id": device_id,
"user_id": user_id,
}
mqtt_connection.publish(
topic=connect_success_topic,
payload=json.dumps(connect_success_message),
qos=mqtt.QoS.AT_LEAST_ONCE
)
logger.info(f"연결 성공 메시지 게시: {connect_success_message}")
# Shadow 상태 구독 및 요청
update_accepted_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_accepted(
request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_update_shadow_accepted)
update_rejected_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_rejected(
request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_update_shadow_rejected)
get_accepted_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_accepted(
request=iotshadow.GetShadowSubscriptionRequest(thing_name=thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_get_shadow_accepted)
get_rejected_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_rejected(
request=iotshadow.GetShadowSubscriptionRequest(thing_name=thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_get_shadow_rejected)
delta_subscribed_future, _ = shadow_client.subscribe_to_shadow_delta_updated_events(
request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name=thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_shadow_delta_updated)
update_accepted_subscribed_future.result()
update_rejected_subscribed_future.result()
get_accepted_subscribed_future.result()
get_rejected_subscribed_future.result()
delta_subscribed_future.result()
# 초기 섀도우 상태 요청
request_shadow_state()
# 센서 데이터 전송 스레드 시작
sensor_thread = Thread(target=publish_sensor_data)
sensor_thread.daemon = True
sensor_thread.start()
while True:
sleep(1)
except exceptions.AwsCrtError as e:
exit_program(e)
except KeyboardInterrupt:
exit_program("Interrupted by user")
except Exception as e:
exit_program(e)
매우 긴 코드를 만나게 되어 당황했을 것이다. 물론 이를 실제 사용을 할 때는 분리를 해서 리팩터링을 진행하는 것이 좋다.
< >로 표시된 값은 개별적으로 입력을 해야 한다.
코드를 작성했다면 필수적으로 AWS IoT Core 콘솔에서 정책을 생성한 뒤 기기에 적용해야 한다.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Publish",
"iot:Receive",
"iot:PublishRetain"
],
"Resource": [
"arn:aws:iot:<region>:<account-id>:topic/<원하는 토픽>",
"arn:aws:iot:<region>:<account-id>:topic/$aws/things/${iot:Connection.Thing.ThingName}/shadow/update",
"arn:aws:iot:<region>:<account-id>:topic/$aws/things/${iot:Connection.Thing.ThingName}/shadow/update/*",
"arn:aws:iot:<region>:<account-id>:topic/$aws/things/${iot:Connection.Thing.ThingName}/shadow/get/*",
"arn:aws:iot:<region>:<account-id>:topic/$aws/things/${iot:Connection.Thing.ThingName}/shadow/get",
"arn:aws:iot:<region>:<account-id>:topic/device/status",
"arn:aws:iot:<region>:<account-id>:topic/device/connected"
]
},
{
"Effect": "Allow",
"Action": "iot:Subscribe",
"Resource": [
"arn:aws:iot:<region>:<account-id>:topicfilter/<원하는 토픽>",
"arn:aws:iot:<region>:<account-id>:topicfilter/$aws/rules/iotdb/*",
"arn:aws:iot:<region>:<account-id>:topicfilter/$aws/things/${iot:Connection.Thing.ThingName}/shadow/update/*",
"arn:aws:iot:<region>:<account-id>:topicfilter/$aws/things/${iot:Connection.Thing.ThingName}/shadow/get/*"
]
},
{
"Effect": "Allow",
"Action": "iot:Connect",
"Resource": "arn:aws:iot:<region>:<account-id>:client/${iot:Connection.Thing.ThingName}"
},
{
"Effect": "Allow",
"Action": [
"iot:GetThingShadow",
"iot:UpdateThingShadow",
"iot:DeleteThingShadow"
],
"Resource": "arn:aws:iot:<region>:<account-id>:thing/${iot:Connection.Thing.ThingName}"
}
]
}
상세 설명
- IoT Core 연결 진행
# MQTT 연결
logger.debug("MQTT 연결 시도 중...")
connect_future = mqtt_connection.connect()
connect_future.result()
logger.info("MQTT 연결 성공!")
# 연결 성공 메시지 게시
connect_success_topic = "device/connected"
connect_success_message = {
"device_id": device_id,
"user_id": user_id,
}
mqtt_connection.publish(
topic=connect_success_topic,
payload=json.dumps(connect_success_message),
qos=mqtt.QoS.AT_LEAST_ONCE
)
logger.info(f"연결 성공 메시지 게시: {connect_success_message}")
코드의 하단 부에 있는 try/except 구문안에 가장 상단부 코드이다. 이 코드는 모의 IoT 기기가 IoT Core 연결 정보를 이용해 IoT Core와 연결을 진행하게 된다. 이때 연결에 성공하면 device/connected라는 토픽에 메시지를 게시하도록 구성했다.
- Device Shadow 토픽 구독
먼저 IoT Core와 기기의 연결이 성공했다면 이어서 Device Shadow를 위한 토픽들을 모의기기가 구독을 해야 한다. 이 코드가 사실 Device Shadow의 핵심이라고 생각된다.
# Shadow 상태 구독 및 요청
update_accepted_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_accepted(
request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_update_shadow_accepted)
update_rejected_subscribed_future, _ = shadow_client.subscribe_to_update_shadow_rejected(
request=iotshadow.UpdateShadowSubscriptionRequest(thing_name=thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_update_shadow_rejected)
get_accepted_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_accepted(
request=iotshadow.GetShadowSubscriptionRequest(thing_name=thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_get_shadow_accepted)
get_rejected_subscribed_future, _ = shadow_client.subscribe_to_get_shadow_rejected(
request=iotshadow.GetShadowSubscriptionRequest(thing_name=thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_get_shadow_rejected)
delta_subscribed_future, _ = shadow_client.subscribe_to_shadow_delta_updated_events(
request=iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name=thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_shadow_delta_updated)
update_accepted_subscribed_future.result()
update_rejected_subscribed_future.result()
get_accepted_subscribed_future.result()
get_rejected_subscribed_future.result()
delta_subscribed_future.result()
차례로 어떤 토픽을 구독하고 있는지 보자
- update accepted
- update rejected
- get accepted
- get rejected
- update delta
위 토픽을 구독하는 것보다 중요한 것이 연결된 콜백이다. 콜백은 좀 더 뒤에 설명을 하도록 하겠다.
위 이미지와 동일하게 MQTT Broker(AWS IoT Core)와 연결 후 update/accepted, update/rejected, update/delta라는 토픽을 구독하기 시작한다.
- update accepted : 업데이트를 성공했다고 보고하는 콜백 위 코드에서는 log를 작성하도록 한다.
- update rejected : 업데이트에 실패했다고 보고하는 콜백
- update delta : update 명령이 수신되는 부분으로 실제 상태 변경 로직이 포함되는 토픽
- get accepted : device shadow의 정보를 성공적으로 받았을 때 보고
- get rejected : device shadow의 정보를 받지 못했을 때 이를 보고
이제 본격적으로 디바이스 섀도를 이용해 상태 변경을 진행할 것이다.
- 초기 섀도우 상태 요청
# 초기 섀도우 상태 요청
request_shadow_state()
def request_shadow_state():
logger.info("섀도우 상태 요청")
try:
publish_get_future = shadow_client.publish_get_shadow(
request=iotshadow.GetShadowRequest(thing_name=thing_name),
qos=mqtt.QoS.AT_LEAST_ONCE
)
publish_get_future.result()
except Exception as e:
exit_program(e)
IoT Core와 연결이 성공하고 Device Shadow를 위한 토픽의 구독이 완료되면 이제 실제 shadow get 토픽에 메시지를 게시한다. 이때 중요한 것은 위에서 구독한 get accepted와 get rejected는 get 토픽에 메시지를 게시해야 받을 수 있는 값이다. 메시지를 게시하지 않고서는 콜백을 받을 수 없다는 것이다. 이때 왜 get 토픽에 메시지를 게시하는지는 유의점에서 소개하겠다.
- Get 토픽 콜백 실행
def on_get_shadow_accepted(response):
try:
logger.info("섀도우 상태를 성공적으로 가져왔습니다.")
if response.state:
if response.state.desired and shadow_property in response.state.desired:
desired_value = response.state.desired[shadow_property]
logger.info(f"기기가 시작될 때 원하는 상태는 '{desired_value}' 입니다.")
update_device_state(desired_value)
elif response.state.reported and shadow_property in response.state.reported:
reported_value = response.state.reported[shadow_property]
logger.info(f"기기가 시작될 때 보고된 상태는 '{reported_value}' 입니다.")
update_device_state(reported_value)
else:
logger.info("섀도우 상태에서 적용할 수 있는 값이 없습니다.")
else:
update_device_state(SHADOW_VALUE_DEFAULT)
except Exception as e:
exit_program(e)
def on_get_shadow_rejected(error):
logger.error(f"섀도우 상태를 가져오는 데 실패했습니다: code={error.code}, message={error.message}")
if error.code == 404:
update_device_state(SHADOW_VALUE_DEFAULT)
publish_shadow_update(SHADOW_VALUE_DEFAULT)
else:
exit_program(f"Get 요청이 거부되었습니다: code={error.code}, message={error.message}")
get 명령을 통해 받은 상태 값을 이용해 클라이언트가 원하는 상태와 현재 기기의 상태를 확인하여 동기화를 진행하게 된다.
위 get 토픽의 호출은 기기의 최초 실행 혹은 기기의 재 부팅과 같은 프로그램의 재시작 과정에서 1회 실행되는 코드이다.
디바이스 섀도를 적용하기 위해서 AWS IoT Core의 콘솔에서 Device Shadow를 생성해야 하는 것이 아닌가?라는 생각이 들 수 있다. 물론 AWS IoT Core 콘솔에서 직접 생성을 할 수 있지만 AWS IoT Core에 등록된 기기는 Device Shadow default 토픽(shadow update)에 메시지를 게시하는 것만으로도 자동으로 default Device Shadow를 생성할 수 있다.
- Update 토픽 수신
만약 클라이언트(모바일 앱)에서 IoT 기기의 상태를 변경하라는 shadow update 명령이 토픽에 게시되면 IoT Core의 디바이스 섀도(가상 기기)에 update 명령이 게시되고 가상기기에 update accepted 혹은 update rejected에 메시지가 게시될 것이다. 이때 기기가 IoT Core와 연결되어 있다면 기기에 Update Delta 토픽을 통해 메시지가 수신될 것이다.
def on_shadow_delta_updated(delta):
try:
logger.info("Delta 업데이트 수신")
if delta.state and shadow_property in delta.state:
shadow_value = delta.state[shadow_property]
update_device_state(shadow_value)
except Exception as e:
exit_program(e)
def on_update_shadow_accepted(response):
logger.info("섀도우 업데이트가 수락되었습니다.")
def on_update_shadow_rejected(error):
logger.error(f"업데이트 요청이 거부되었습니다: code={error.code}, message={error.message}")
def update_device_state(value):
local_state = read_device_state()
if local_state[shadow_property] != value:
local_state[shadow_property] = value
write_device_state(local_state)
logger.info(f"디바이스 상태를 {value}로 업데이트했습니다.")
publish_shadow_update(value)
def publish_shadow_update(value):
token = str(uuid4())
request = iotshadow.UpdateShadowRequest(
thing_name=thing_name,
state=iotshadow.ShadowState(
reported={shadow_property: value},
desired=None # desired 상태는 설정하지 않음 (기기의 현재 상태만 보고)
),
client_token=token,
)
shadow_client.publish_update_shadow(request, mqtt.QoS.AT_LEAST_ONCE)
delta 토픽을 통해 변경 요청이 들어오면 콜백 메서드가 실행되고 메시지에 있는 delta값을 이용해 상태를 변경하게 된다.
나는 기기의 상태 변경을 별도의 JSON 파일을 이용해 관리했다. 만약 이 방법을 사용하지 않을 경우 local_state를 저장하는 메서드를 수정하여 원하는 상태 저장 방법을 사용해 기기의 상태를 관리하면 될 것이다.
기기의 로컬 상태를 변경하고 변경된 상태를 reported에 담아 device shadow에 보고를 한다. 이 과정은 클라이언트가 원하는 값을(desired) 성공적으로 기기에 적용했다고 기기의 현재 상태를 (reported)로 설정하여 보고를 하는 것이다.
모든 코드 작업이 완료되었다면 해당 코드를 실행해서 문제없이 동작이 되는지 확인해 봐야 한다.
상태값 변경 실습
이제 코드 작업이 완료되었으니 실제 기기의 상태값을 변경해 보겠다. 먼저 AWS IoT Core 콘솔에 접속해 등록된 사물에서 디바이스 섀도를 확인한다.
보면 클래식 섀도라는 것이 생성된 것을 확인할 수 있다. 클릭하여 세부 정보를 확인해 본다.
아래에 있는 탭을 디바이스 섀도 문서에서 MQTT 주제로 변경하면 오른쪽에 MQTT 테스트 클라이언트라는 버튼이 있을 것이다 해당 버튼을 누르면 MQTT 테스트 클라이언트에 디바이스 섀도 관련 MQTT 토픽을 자동으로 구독해 준다. 클릭해 보자
새로운 창이 열리고 MQTT 테스트 클라이언트에 Device Shadow 관련 토픽이 자동으로 구독되어 있을 것이다 하지만 이 토픽만으로는 Update 명령을 내릴 수 없다.
$aws/things/<thing name>/shadow/update
위 토픽을 구독하자 (thing name은 변경해야 한다.)
{
"state": {
"desired": {
"color": "red"
}
}
}
이러한 메시지를 작성해서 게시를 눌러본다.
다음과 같은 메시지가 표시된다면 성공적으로 IoT 기기의 상태가 변경이 된 것이다.
추가로 테스트를 위해 실행 중인 프로그램을 종료하고 AWS IoT Core 콘솔에서 color 값을 변경해서 게시를 눌러본다.
이런 식으로 Device Shadow에 변경 요청이 가면서 update/accepted , update/documents 와 같은 토픽에 메시지가 게시됐다고 표시될 것이다.
그다음 다시 IoT 기기 프로그램을 실행해 본다.
다음과 같이 터미널에 로그 생성된 것을 확인하고 로컬에서 관리하는 JSON 파일의 color 값이 black으로 변경된 것을 볼 수 있다. 이렇게 표시되었다면 성공적으로 Device Shadow를 적용한 것이다.
3. Device Shadow 구현에 있어 유의할 점
shadow get topic 구독 이유
위 이미지를 참고해서 기능을 구현할 때 있었던 의문이다. 분명 device shadow get 토픽에 대해서 구독을 하는 부분이 없는 것을 확인했는데 AWS IoT Core Python SDK의 예시 코드나 개발자 가이드의 코드 등을 확인하면 전부 get accpeted 콜백으로 복잡한 코드가 있었고 특히 AWS IoT Core Python SDK의 경우 파이썬 프로그램의 실행 시 터미널 상에서 변숫값을 받아서 실행하는 구조로 되어 있어 초반에 이해하는 것이 매우 어려웠다.
위의 이미지 대로라면 디바이스의 상태 변경에는 update 토픽들만 있어도 되는 것 같았지만 이렇게 구현을 하니 기기의 전원이 꺼져 있는 상태에서 클라이언트의 변경 요청이 있을 경우 IoT 기기의 전원이 켜지고 프로그램이 실행될 때 클라이언트의 업데이트 명령을 수행할 수 없었던 것이다. 이렇게 되다 보니 디바이스 섀도를 사용하려던 이유인 재시작 시 업데이트되 변경 사항을 적용하는 기능을 사용하지 못하게 되니 Device Shadow를 사용하는 의미가 없어졌던 것이다. 나는 delta 토픽만 있으면 기기가 시작할 때 값을 불러와서 처리할 수 있다고 생각했는데 그것이 아니었다.
delta 이벤트는 desired 상태와 reported 상태 간의 불일치를 나타내는데, 장치가 AWS IoT에 연결되어 있을 때 desired 상태가 변경되면 발생한다고 한다. 하지만 장치가 연결되기 전에 desired 값이 설정된 경우, 장치가 처음 연결될 때 delta 이벤트가 발생하지 않는다고 한다. 이는 장치가 이전에 설정했던 reported 상태를 알지 못하고 delta를 트리거할 수 없기 때문이라고 한다.
즉 재 연결되는 기기가 자신이 어떤 상태이고 자신이 연결되지 않았던 동안 desired 값으로 어떤 값을 설정했는지 알기 위해 get 토픽을 조회하여 데이터를 불러오고 이 데이터에 desired와 reported 값이 다른 경우 이를 적용하고 동기화하는 과정이 필요했기 때문에 get 토픽을 구독하고 있었던 것이다.
디바이스 섀도 비용 발생
디바이스 섀도는 분명하게 추가 비용이 발생하는 서비스이다. 이 점을 유의해야 할 것이다. 만약 자신이 해결해야 하는 상황을 잘 살펴야 한다. 만약 IoT 기기에 조작 명령을 내려야 한다. 이때 클라이언트의 변경 요청이 절대 무시되는 경우 없이 무조건 기기에 적용이 되어야 한다고 하면 Device Shadow를 이용해 기기가 종료되었다 재 시작되는 경우에도 클라이언트가 보낸 변경 명령을 확인하고 적용하도록 할 수 있다. 반면 클라이언트의 조작 명령이 IoT 기기에 적용이 되어야 하지만 그 중요도가 떨어지는 경우 그러니까 기기가 켜져 있는 상태에서 적용되면 좋지만 만약 기기가 종료되어 있어 제 때 적용되지 않아도 될 정도의 상황이면 디바이스 섀도를 적용하지 않고 일반 MQTT 토픽을 이용해 별도로 구현을 하는 것이 좋을 것이다.
AWS IoT Core Device Policy 확인
만약 Device Shadow를 구현하고 실행을 했을 때 AWS IoT Core와 연결이 제대로 되지 않고 종료되는 경우가 있을 것이다. 나 또한 이런 경우가 매우 많았다. 이런 경우 거의 대부분 device policy에 정책이 문제가 있었다. 오타가 나서 이상한 토픽에 대해 허용을 했거나 하는 문제가 많았다. 프로그램의 실행 시 AWS IoT Core와의 연결에 문제가 있는 경우 일단 정책부터 다시 확인하는 것이 좋다.
4. 정리
지금 까지 AWS IoT Core Device Shadow를 적용하는 과정을 알아보았다. Device Shadow를 적용하는 과정에서 각종 문제가 있었고 많은 허튼짓?을 했지만 작동되는 코드를 소개해 봤다. 나 역시 Device Shadow를 구현해 보며 많은 문제를 경험했고 성공하게 된 코드를 가져와 본 것이다. Device Shadow를 구현해야 하는 개발자가 내 코드와 경험을 토대로 비교적 쉽게 기능 구현을 할 수 있으면 좋을 것 같다.
추천글
https://docs.aws.amazon.com/ko_kr/iot/latest/developerguide/iot-device-shadows.html
AWS IoT Device Shadow 서비스 - AWS IoT Core
섀도우 데이터 객체에 포함된 데이터는 다른 섀도우 및 사물의 속성 및 사물 객체의 디바이스가 게시할 수 있는 MQTT 메시지의 콘텐츠와 같은 기타 사물 객체 속성과는 독립적입니다. 그러나 필
docs.aws.amazon.com
https://iotatlas.net/en/implementations/aws/device_state_replica/device_state_replica1/
Device Shadows - MQTT Topics :: IoT Atlas
Device Shadows - MQTT Topics Device Shadow is a service of AWS IoT Core. Shadows can make a device’s state available via MQTT topics whether the device is connected or not. This allows Apps and Services to interact with the device state at any time. Use
iotatlas.net