Compare commits
2 Commits
master
...
refactorin
Author | SHA1 | Date | |
---|---|---|---|
bd2e64b0cd | |||
eb0d3d7353 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -3,6 +3,7 @@
|
|||||||
.tox
|
.tox
|
||||||
.cache
|
.cache
|
||||||
.venv
|
.venv
|
||||||
|
.history
|
||||||
*.egg-info
|
*.egg-info
|
||||||
/.project
|
/.project
|
||||||
/.pydevproject
|
/.pydevproject
|
||||||
|
@ -32,6 +32,7 @@ class TelegramParser(object):
|
|||||||
object["obis_reference"]: re.compile(object["obis_reference"], re.DOTALL | re.MULTILINE)
|
object["obis_reference"]: re.compile(object["obis_reference"], re.DOTALL | re.MULTILINE)
|
||||||
for object in self.telegram_specification['objects']
|
for object in self.telegram_specification['objects']
|
||||||
}
|
}
|
||||||
|
self._telegram_encryption_active = None
|
||||||
|
|
||||||
def parse(self, telegram_data, encryption_key="", authentication_key="", throw_ex=False): # noqa: C901
|
def parse(self, telegram_data, encryption_key="", authentication_key="", throw_ex=False): # noqa: C901
|
||||||
"""
|
"""
|
||||||
@ -46,38 +47,11 @@ class TelegramParser(object):
|
|||||||
:raises ParseError:
|
:raises ParseError:
|
||||||
:raises InvalidChecksumError:
|
:raises InvalidChecksumError:
|
||||||
"""
|
"""
|
||||||
|
telegram_data = self.decrypt_telegram_data(
|
||||||
if "general_global_cipher" in self.telegram_specification:
|
telegram_data=telegram_data,
|
||||||
if self.telegram_specification["general_global_cipher"]:
|
encryption_key=encryption_key,
|
||||||
enc_key = unhexlify(encryption_key)
|
authentication_key=authentication_key
|
||||||
auth_key = unhexlify(authentication_key)
|
)
|
||||||
telegram_data = unhexlify(telegram_data)
|
|
||||||
apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data)
|
|
||||||
if apdu.security_control.security_suite != 0:
|
|
||||||
logger.warning("Untested security suite")
|
|
||||||
if apdu.security_control.authenticated and not apdu.security_control.encrypted:
|
|
||||||
logger.warning("Untested authentication only")
|
|
||||||
if not apdu.security_control.authenticated and not apdu.security_control.encrypted:
|
|
||||||
logger.warning("Untested not encrypted or authenticated")
|
|
||||||
if apdu.security_control.compressed:
|
|
||||||
logger.warning("Untested compression")
|
|
||||||
if apdu.security_control.broadcast_key:
|
|
||||||
logger.warning("Untested broadcast key")
|
|
||||||
telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii")
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
|
|
||||||
raise RuntimeError("Looks like a general_global_cipher frame "
|
|
||||||
"but telegram specification is not matching!")
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
|
|
||||||
raise RuntimeError(
|
|
||||||
"Looks like a general_global_cipher frame but telegram specification is not matching!")
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
if self.apply_checksum_validation and self.telegram_specification['checksum_support']:
|
if self.apply_checksum_validation and self.telegram_specification['checksum_support']:
|
||||||
self.validate_checksum(telegram_data)
|
self.validate_checksum(telegram_data)
|
||||||
@ -112,6 +86,42 @@ class TelegramParser(object):
|
|||||||
|
|
||||||
return telegram
|
return telegram
|
||||||
|
|
||||||
|
def decrypt_telegram_data(self, encryption_key, authentication_key, telegram_data):
|
||||||
|
"""
|
||||||
|
Check if telegram data is encrypted and decrypt if applicable.
|
||||||
|
"""
|
||||||
|
# if self._telegram_encryption_active is False:
|
||||||
|
# # If encryption is not working, stop trying and logging warnings.
|
||||||
|
# return telegram_data
|
||||||
|
|
||||||
|
if self.telegram_specification.get("general_global_cipher"):
|
||||||
|
enc_key = unhexlify(encryption_key)
|
||||||
|
auth_key = unhexlify(authentication_key)
|
||||||
|
telegram_data = unhexlify(telegram_data)
|
||||||
|
apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data)
|
||||||
|
if apdu.security_control.security_suite != 0:
|
||||||
|
logger.warning("Untested security suite")
|
||||||
|
if apdu.security_control.authenticated and not apdu.security_control.encrypted:
|
||||||
|
logger.warning("Untested authentication only")
|
||||||
|
if not apdu.security_control.authenticated and not apdu.security_control.encrypted:
|
||||||
|
logger.warning("Untested not encrypted or authenticated")
|
||||||
|
if apdu.security_control.compressed:
|
||||||
|
logger.warning("Untested compression")
|
||||||
|
if apdu.security_control.broadcast_key:
|
||||||
|
logger.warning("Untested broadcast key")
|
||||||
|
telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii")
|
||||||
|
self._telegram_encryption_active = True
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
|
||||||
|
logger.warning("Looks like a general_global_cipher frame "
|
||||||
|
"but telegram specification is not matching!")
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
self._telegram_encryption_active = False
|
||||||
|
|
||||||
|
return telegram_data
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def validate_checksum(telegram):
|
def validate_checksum(telegram):
|
||||||
"""
|
"""
|
||||||
|
0
test/clients/__init__.py
Normal file
0
test/clients/__init__.py
Normal file
21
test/clients/test_filereader.py
Normal file
21
test/clients/test_filereader.py
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
import unittest
|
||||||
|
import tempfile
|
||||||
|
|
||||||
|
from dsmr_parser.clients.filereader import FileReader
|
||||||
|
from dsmr_parser.telegram_specifications import V5
|
||||||
|
from test.example_telegrams import TELEGRAM_V5
|
||||||
|
|
||||||
|
|
||||||
|
class FileReaderTest(unittest.TestCase):
|
||||||
|
def test_read_as_object(self):
|
||||||
|
with tempfile.NamedTemporaryFile() as file:
|
||||||
|
with open(file.name, "w") as f:
|
||||||
|
f.write(TELEGRAM_V5)
|
||||||
|
|
||||||
|
telegrams = []
|
||||||
|
reader = FileReader(file=file.name, telegram_specification=V5)
|
||||||
|
# Call
|
||||||
|
for telegram in reader.read_as_object():
|
||||||
|
telegrams.append(telegram)
|
||||||
|
|
||||||
|
self.assertEqual(len(telegrams), 1)
|
77
test/clients/test_rfxtrx_protocol.py
Normal file
77
test/clients/test_rfxtrx_protocol.py
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
from unittest.mock import Mock
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from dsmr_parser import obis_references as obis
|
||||||
|
from dsmr_parser.clients.rfxtrx_protocol import create_rfxtrx_dsmr_protocol, PACKETTYPE_DSMR, SUBTYPE_P1
|
||||||
|
from dsmr_parser.objects import Telegram
|
||||||
|
|
||||||
|
TELEGRAM_V2_2 = (
|
||||||
|
'/ISk5\2MT382-1004\r\n'
|
||||||
|
'\r\n'
|
||||||
|
'0-0:96.1.1(00000000000000)\r\n'
|
||||||
|
'1-0:1.8.1(00001.001*kWh)\r\n'
|
||||||
|
'1-0:1.8.2(00001.001*kWh)\r\n'
|
||||||
|
'1-0:2.8.1(00001.001*kWh)\r\n'
|
||||||
|
'1-0:2.8.2(00001.001*kWh)\r\n'
|
||||||
|
'0-0:96.14.0(0001)\r\n'
|
||||||
|
'1-0:1.7.0(0001.01*kW)\r\n'
|
||||||
|
'1-0:2.7.0(0000.00*kW)\r\n'
|
||||||
|
'0-0:17.0.0(0999.00*kW)\r\n'
|
||||||
|
'0-0:96.3.10(1)\r\n'
|
||||||
|
'0-0:96.13.1()\r\n'
|
||||||
|
'0-0:96.13.0()\r\n'
|
||||||
|
'0-1:24.1.0(3)\r\n'
|
||||||
|
'0-1:96.1.0(000000000000)\r\n'
|
||||||
|
'0-1:24.3.0(161107190000)(00)(60)(1)(0-1:24.2.1)(m3)\r\n'
|
||||||
|
'(00001.001)\r\n'
|
||||||
|
'0-1:24.4.0(1)\r\n'
|
||||||
|
'!\r\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
OTHER_RF_PACKET = b'\x03\x01\x02\x03'
|
||||||
|
|
||||||
|
|
||||||
|
def encode_telegram_as_RF_packets(telegram):
|
||||||
|
data = b''
|
||||||
|
|
||||||
|
for line in telegram.split('\n'):
|
||||||
|
packet_data = (line + '\n').encode('ascii')
|
||||||
|
packet_header = bytes(bytearray([
|
||||||
|
len(packet_data) + 3, # excluding length byte
|
||||||
|
PACKETTYPE_DSMR,
|
||||||
|
SUBTYPE_P1,
|
||||||
|
0 # seq num (ignored)
|
||||||
|
]))
|
||||||
|
|
||||||
|
data += packet_header + packet_data
|
||||||
|
# other RF packets can pass by on the line
|
||||||
|
data += OTHER_RF_PACKET
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
class RFXtrxProtocolTest(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
new_protocol, _ = create_rfxtrx_dsmr_protocol('2.2',
|
||||||
|
telegram_callback=Mock(),
|
||||||
|
keep_alive_interval=1)
|
||||||
|
self.protocol = new_protocol()
|
||||||
|
|
||||||
|
def test_complete_packet(self):
|
||||||
|
"""Protocol should assemble incoming lines into complete packet."""
|
||||||
|
|
||||||
|
data = encode_telegram_as_RF_packets(TELEGRAM_V2_2)
|
||||||
|
# send data broken up in two parts
|
||||||
|
self.protocol.data_received(data[0:200])
|
||||||
|
self.protocol.data_received(data[200:])
|
||||||
|
|
||||||
|
telegram = self.protocol.telegram_callback.call_args_list[0][0][0]
|
||||||
|
assert isinstance(telegram, Telegram)
|
||||||
|
|
||||||
|
assert float(telegram[obis.CURRENT_ELECTRICITY_USAGE].value) == 1.01
|
||||||
|
assert telegram[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW'
|
||||||
|
|
||||||
|
assert float(telegram[obis.GAS_METER_READING].value) == 1.001
|
||||||
|
assert telegram[obis.GAS_METER_READING].unit == 'm3'
|
@ -21,7 +21,7 @@ class TelegramParserV5Test(unittest.TestCase):
|
|||||||
telegram = parser.parse(TELEGRAM_V5, throw_ex=True)
|
telegram = parser.parse(TELEGRAM_V5, throw_ex=True)
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
assert False, f"parse trigged an exception {ex}"
|
assert False, f"parse trigged an exception {ex}"
|
||||||
print('test: ', type(telegram.P1_MESSAGE_HEADER), telegram.P1_MESSAGE_HEADER.__dict__)
|
|
||||||
# P1_MESSAGE_HEADER (1-3:0.2.8)
|
# P1_MESSAGE_HEADER (1-3:0.2.8)
|
||||||
assert isinstance(telegram.P1_MESSAGE_HEADER, CosemObject)
|
assert isinstance(telegram.P1_MESSAGE_HEADER, CosemObject)
|
||||||
assert telegram.P1_MESSAGE_HEADER.unit is None
|
assert telegram.P1_MESSAGE_HEADER.unit is None
|
||||||
|
Loading…
Reference in New Issue
Block a user