flask-mqtt : subscribe 시 qos 설정 이슈 수정하기

Jan 29, 2018/Feb 01, 2018

이번 프로젝트를 하면서 주문관련 부분을 기존의 폴링(polling) 을 하던 방식에서 중간에 브로커서버를 두고 주문하는 쪽에서 주문을 보내면 브로커 서버의 특정 topic 을 구독하고 있는 구독자가 해당 주문을 받는 형태로 구성을 했다. 그 과정에서 mqtt 를 이용했고, emqtt 브로커 서버를 선택해서 사용하고 있다. 주문을 하는 쪽에서는 flask 로 구성되어 있어서 별도의 paho client 라이브러리를 띄워서 쓰기 보다는 flask-mqtt 를 사용했다.(내부적으로 paho 를 사용한다.)

문제발견

사용하다보니 자연스럽게 QOS 에 대한 이슈가 생겼다. 유실되는 부분이 있었고, 0 으로 놓고 쓰던 것을 2로 올려서 사용하자고 합의하게 되었는데, flask-mqtt 에서 아래와 같이 subscribe 하는 부분에 qos 를 2 로 올렸는데 emqtt dashboard 에서 subscribe 에서 계속 0으로 설정된 것을 발견했다.

import json  
from flask import Flask  
from flask_mqtt import Mqtt

app = Flask(__name__)  
app.config['MQTT_BROKER_URL'] = 'localhost'  
app.config['MQTT_BROKER_PORT'] = 1993  
app.config['MQTT_KEEPALIVE'] = 30

mqtt = Mqtt(app)  
mqtt.subscribe(topic="TEST", qos=2)

app.run()  

위와 같이 튜토리얼에 나와있는대로 제일 간단한 형태로 만들어서 테스트 해봐도 계속 qos 가 2로 변경되지는 않았다. flask-mqtt 도 결국 내부에서 paho 를 쓰기 때문에 client 의 subscribe() 함수를 봤는데, 아래와 같이 topic 이 basestring 일 경우와 tuple 일 경우에 대한 분기 처리가 되어 있었다.

def subscribe(self, topic, qos=0):

    topic_qos_list = None

    if isinstance(topic, tuple):
        topic, qos = topic

    if isinstance(topic, basestring):
        if qos < 0 or qos > 2:
            raise ValueError('Invalid QoS level.')
        if topic is None or len(topic) == 0:
            raise ValueError('Invalid topic.')
        topic_qos_list = [(topic.encode('utf-8'), qos)]

그래서 다시 아래와 같이 flask-mqtt 에서 tuple 의 형태로 topic 과 qos 를 전달해 보기로 했다.

mqtt.subscribe(("TEST_FLAK_MQTT",2)) # tuple 

#mqtt.subscribe(topic="TEST_FLAK_MQTTT", qos=2)

위와 같이 tuple 로 전달할 경우, 제대로 subscribe 의 qos 가 2 로 설정되는 것을 확인했다. 이상했다. 왜 저렇게 동작을 할까? 의문이 많이 들었고, 이 부분을 flask-mqtt 의 이슈 #19 로 올렸는데 원래 만든 개발자 역시 의문을 품었다.

문제 파고들기

flask-mqtt 의 subscribe() 함수에 topic 과 qos 가 어떻게 들어오는 지 로그를 남겨봤는데, 이상하게 subscribe 를 한번 호출했는데 2번 로그가 출력이 되었고, qos 가 처음에는 2, 그 다음에는 0 이렇게 찍히는 것을 확인했다.

topic : 'TEST' qos : 2  
topic : 'TEST' qos : 0  

왜 두번 출력이 될까?

flask-mqtt 에 찾아보니 _handle_connect 함수에서 subscribe 함수를 호출하고 있었다. 이 함수는 paho client 의 on_connect 의 이벤트 핸들러 함수로 연결시 호출하게 되어 있다. 이 함수를 보면 원인을 알 수가 있다.

def _handle_connect(self, client, userdata, flags, rc):  
    # type: (Client, Any, Dict, int) -> None
    if rc == MQTT_ERR_SUCCESS:
        self.connected = True
        for topic in self.topics:
            self.client.subscribe(topic)
    if self._connect_handler is not None:
        self._connect_handler(client, userdata, flags, rc)

2번 호출이 되는 이유는 첫번째는 내가 명시적인 호출로 인해서 호출이 되는데 접속이 되면 on_connect 에 연결된 _handle_connect 함수가 호출되고 그 안에서 subscribe 다시 호출하고 있다. 이 과정에서 qos 는 저장하지 않은 상태에서 topic 으로 다시 구독을 신청하기 때문에 기본값인 qos:0 값이 들어가서 아무리 2로 설정해도 0으로 다시 덮어써져 버린다.

해결하기

일단 해결을 할지 말지에 대해서 생각해 봤다. 앞서 말한것 처럼 tuple 형태 ('TEST', 2) 이런식으로 topic 을 전달하면 paho 라이브러리에서 파싱되어서 2로 세팅 할 수 있다. 그리고 flask-mqtt 개발자는 on_connect 시 subscribe 를 하라고 했는데, 제일 간단한 해결책이지만, 이 방법은 2로 명시적으로 설정하고 0으로 설정된 것을 다시 2로 설정하는 덮어쓰기 식이어서 문제가 있다고 생각했다.

@mqtt.on_connect()
def connected(*args, **kwargs):  
    mqtt.subscribe("TEST_FLASK_MQTT", 2)

일단 가장 큰 문제는 qos 를 topic 과 함께 저장하지 않는다는 것이다. 그래서 qos 를 topic 과 함께 저장하기 위해서 namedtuple 형태의 TopicQos 를 만들었다.

TopicQos = namedtuple('TopicQos', ['topic', 'qos'])  

그리고 나서 기존의 list 에 topic 을 저장하던 것을 TopicQos 를 생성해서 저장하면 되겠지라고 간단하게 생각했는데 unsubscribe() 함수에서 qos 를 받지 않기 때문에 기존 코드를 그대로 쓸 수가 없었다. qos 가 없어서 self.topics 에서 topic 이 있는지 확인할 수가 없고, remove 코드 역시 그대로 쓸 수가 없었다.

def unsubscribe(self, topic):  
    if topic not in self.topics:
        return

    result, mid = self.client.unsubscribe(topic)

    # if successful remove from topics
    if result == MQTT_ERR_SUCCESS:
        self.topics.remove(topic)

    return result, mid

unsubscribe() 함수에 qos 인자를 추가하는 것보다는, self.topics 에서 qos 가 없어서 같은 topic 이면 topic 문자열 만으로 찾고, 삭제(remove) 하는 기능을 만들었다. 기존의 self.topics 는 일반 list 를 사용하고 있지만, 일반 list 를 사용하는 부분과 유사하게 지원하기 위해서 collections.MutableSequence 를 이용해서 TopicQosList 클래스를 만들었다. 내부적으로는 리스트로 동작하지만, list 와 같은 함수들을 지원하기 위해서 구현해야하는 몇 가지 magic method(__ 로 시작하는 메소드)를 구현했다.

class TopicQosList(MutableSequence):

    def __init__(self):
        self.topic_qos = []

    def __setitem__(self, index, value):
        if isinstance(value, TopicQos):
            self.topic_qos.insert(index, value)
        else:
            raise TypeError('value type error')

    def insert(self, index, value):
        if isinstance(value, TopicQos):
            self.topic_qos.insert(index, value)
        else:
            raise TypeError('value type error')

    def __delitem__(self, index):
        del self.topic_qos[index]

    def __len__(self):
        return len(self.topic_qos)

    def __getitem__(self, index):
        return self.topic_qos[index]

    def __repr__(self):
        return str(self.topic_qos)

    def __contains__(self, item):
        if isinstance(item, TopicQos):
            return item in self.topic_qos
        elif isinstance(item, str):
            for topic, _ in self.topic_qos:
                if topic == topic:
                    return True
            return False
        else:
            return False

    def index(self, value, **kwargs):
        for i, v in enumerate(self):
            if isinstance(value, TopicQos):
                if v == value:
                    return i
            if isinstance(value, str):
                if v.topic == value:
                    return i
        raise ValueError

__contains__() 부분이 in 부분을 수행할 때 호출되는 부분인데, 이 함수에서 TopicQos 인지, str 인지 체크해서 str 인 경우에는 topic 문자열이 같으면 True 를 반환하도록 수정하였다. remove(topic) 을 지원하기 위해서 index 함수에서도 마찬가지로 str 의 경우에 해당 index 값을 반환하도록 수정하였다.

만들고 난후, PR 을 보냈는데 travis-ci 를 통과하긴 했지만, flaks-mqtt 개발자가 굳이 normal list 대신 MutableSequence 를 쓴 이유를 묻긴 했는데, list 보다는 collections 의 추상클래스를 상속받아서 구현하는게 낫다는 책의 내용을 봐서 그렇게 구현한 것 같다. 이 부분에 대해서는 어떤식으로 결론이 날지는 모르겠다.(PR이 머지될지는 미지수;;)

2018.02.01

후기

사실 별거 아닌 PR 인데, 실 프로젝트에서 사용하는 오픈소스에서 문제점을 발견하고 고쳤다는 점에서 재밌었다. 기여를 했다 안했다가 중요한 건 아닌것 같고, 덮어두고 사용하기 급급했는데 좀 더 안쪽까지 보게된 계기였던것 같다.


#FLASK  #github  #mqtt  #flask-mqtt  #QOS 

Related Post 👇

  • 발표자료 - github : 유용한 기능들