Compare commits

...

2 Commits

6 changed files with 142 additions and 33 deletions

1
.gitignore vendored
View File

@ -3,6 +3,7 @@
.tox
.cache
.venv
.history
*.egg-info
/.project
/.pydevproject

View File

@ -32,6 +32,7 @@ class TelegramParser(object):
object["obis_reference"]: re.compile(object["obis_reference"], re.DOTALL | re.MULTILINE)
for object in self.telegram_specification['objects']
}
self._telegram_encryption_active = None
def parse(self, telegram_data, encryption_key="", authentication_key="", throw_ex=False): # noqa: C901
"""
@ -46,38 +47,11 @@ class TelegramParser(object):
:raises ParseError:
:raises InvalidChecksumError:
"""
if "general_global_cipher" in self.telegram_specification:
if self.telegram_specification["general_global_cipher"]:
enc_key = unhexlify(encryption_key)
auth_key = unhexlify(authentication_key)
telegram_data = unhexlify(telegram_data)
apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data)
if apdu.security_control.security_suite != 0:
logger.warning("Untested security suite")
if apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested authentication only")
if not apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested not encrypted or authenticated")
if apdu.security_control.compressed:
logger.warning("Untested compression")
if apdu.security_control.broadcast_key:
logger.warning("Untested broadcast key")
telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii")
else:
try:
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
raise RuntimeError("Looks like a general_global_cipher frame "
"but telegram specification is not matching!")
except Exception:
pass
else:
try:
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
raise RuntimeError(
"Looks like a general_global_cipher frame but telegram specification is not matching!")
except Exception:
pass
telegram_data = self.decrypt_telegram_data(
telegram_data=telegram_data,
encryption_key=encryption_key,
authentication_key=authentication_key
)
if self.apply_checksum_validation and self.telegram_specification['checksum_support']:
self.validate_checksum(telegram_data)
@ -112,6 +86,42 @@ class TelegramParser(object):
return telegram
def decrypt_telegram_data(self, encryption_key, authentication_key, telegram_data):
"""
Check if telegram data is encrypted and decrypt if applicable.
"""
# if self._telegram_encryption_active is False:
# # If encryption is not working, stop trying and logging warnings.
# return telegram_data
if self.telegram_specification.get("general_global_cipher"):
enc_key = unhexlify(encryption_key)
auth_key = unhexlify(authentication_key)
telegram_data = unhexlify(telegram_data)
apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data)
if apdu.security_control.security_suite != 0:
logger.warning("Untested security suite")
if apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested authentication only")
if not apdu.security_control.authenticated and not apdu.security_control.encrypted:
logger.warning("Untested not encrypted or authenticated")
if apdu.security_control.compressed:
logger.warning("Untested compression")
if apdu.security_control.broadcast_key:
logger.warning("Untested broadcast key")
telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii")
self._telegram_encryption_active = True
else:
try:
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
logger.warning("Looks like a general_global_cipher frame "
"but telegram specification is not matching!")
except Exception:
pass
self._telegram_encryption_active = False
return telegram_data
@staticmethod
def validate_checksum(telegram):
"""

0
test/clients/__init__.py Normal file
View File

View File

@ -0,0 +1,21 @@
import unittest
import tempfile
from dsmr_parser.clients.filereader import FileReader
from dsmr_parser.telegram_specifications import V5
from test.example_telegrams import TELEGRAM_V5
class FileReaderTest(unittest.TestCase):
def test_read_as_object(self):
with tempfile.NamedTemporaryFile() as file:
with open(file.name, "w") as f:
f.write(TELEGRAM_V5)
telegrams = []
reader = FileReader(file=file.name, telegram_specification=V5)
# Call
for telegram in reader.read_as_object():
telegrams.append(telegram)
self.assertEqual(len(telegrams), 1)

View File

@ -0,0 +1,77 @@
from unittest.mock import Mock
import unittest
from dsmr_parser import obis_references as obis
from dsmr_parser.clients.rfxtrx_protocol import create_rfxtrx_dsmr_protocol, PACKETTYPE_DSMR, SUBTYPE_P1
from dsmr_parser.objects import Telegram
TELEGRAM_V2_2 = (
'/ISk5\2MT382-1004\r\n'
'\r\n'
'0-0:96.1.1(00000000000000)\r\n'
'1-0:1.8.1(00001.001*kWh)\r\n'
'1-0:1.8.2(00001.001*kWh)\r\n'
'1-0:2.8.1(00001.001*kWh)\r\n'
'1-0:2.8.2(00001.001*kWh)\r\n'
'0-0:96.14.0(0001)\r\n'
'1-0:1.7.0(0001.01*kW)\r\n'
'1-0:2.7.0(0000.00*kW)\r\n'
'0-0:17.0.0(0999.00*kW)\r\n'
'0-0:96.3.10(1)\r\n'
'0-0:96.13.1()\r\n'
'0-0:96.13.0()\r\n'
'0-1:24.1.0(3)\r\n'
'0-1:96.1.0(000000000000)\r\n'
'0-1:24.3.0(161107190000)(00)(60)(1)(0-1:24.2.1)(m3)\r\n'
'(00001.001)\r\n'
'0-1:24.4.0(1)\r\n'
'!\r\n'
)
OTHER_RF_PACKET = b'\x03\x01\x02\x03'
def encode_telegram_as_RF_packets(telegram):
data = b''
for line in telegram.split('\n'):
packet_data = (line + '\n').encode('ascii')
packet_header = bytes(bytearray([
len(packet_data) + 3, # excluding length byte
PACKETTYPE_DSMR,
SUBTYPE_P1,
0 # seq num (ignored)
]))
data += packet_header + packet_data
# other RF packets can pass by on the line
data += OTHER_RF_PACKET
return data
class RFXtrxProtocolTest(unittest.TestCase):
def setUp(self):
new_protocol, _ = create_rfxtrx_dsmr_protocol('2.2',
telegram_callback=Mock(),
keep_alive_interval=1)
self.protocol = new_protocol()
def test_complete_packet(self):
"""Protocol should assemble incoming lines into complete packet."""
data = encode_telegram_as_RF_packets(TELEGRAM_V2_2)
# send data broken up in two parts
self.protocol.data_received(data[0:200])
self.protocol.data_received(data[200:])
telegram = self.protocol.telegram_callback.call_args_list[0][0][0]
assert isinstance(telegram, Telegram)
assert float(telegram[obis.CURRENT_ELECTRICITY_USAGE].value) == 1.01
assert telegram[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW'
assert float(telegram[obis.GAS_METER_READING].value) == 1.001
assert telegram[obis.GAS_METER_READING].unit == 'm3'

View File

@ -21,7 +21,7 @@ class TelegramParserV5Test(unittest.TestCase):
telegram = parser.parse(TELEGRAM_V5, throw_ex=True)
except Exception as ex:
assert False, f"parse trigged an exception {ex}"
print('test: ', type(telegram.P1_MESSAGE_HEADER), telegram.P1_MESSAGE_HEADER.__dict__)
# P1_MESSAGE_HEADER (1-3:0.2.8)
assert isinstance(telegram.P1_MESSAGE_HEADER, CosemObject)
assert telegram.P1_MESSAGE_HEADER.unit is None