Compare commits
2 Commits
master
...
refactorin
Author | SHA1 | Date | |
---|---|---|---|
bd2e64b0cd | |||
eb0d3d7353 |
1
.gitignore
vendored
1
.gitignore
vendored
@ -3,6 +3,7 @@
|
||||
.tox
|
||||
.cache
|
||||
.venv
|
||||
.history
|
||||
*.egg-info
|
||||
/.project
|
||||
/.pydevproject
|
||||
|
@ -32,6 +32,7 @@ class TelegramParser(object):
|
||||
object["obis_reference"]: re.compile(object["obis_reference"], re.DOTALL | re.MULTILINE)
|
||||
for object in self.telegram_specification['objects']
|
||||
}
|
||||
self._telegram_encryption_active = None
|
||||
|
||||
def parse(self, telegram_data, encryption_key="", authentication_key="", throw_ex=False): # noqa: C901
|
||||
"""
|
||||
@ -46,38 +47,11 @@ class TelegramParser(object):
|
||||
:raises ParseError:
|
||||
:raises InvalidChecksumError:
|
||||
"""
|
||||
|
||||
if "general_global_cipher" in self.telegram_specification:
|
||||
if self.telegram_specification["general_global_cipher"]:
|
||||
enc_key = unhexlify(encryption_key)
|
||||
auth_key = unhexlify(authentication_key)
|
||||
telegram_data = unhexlify(telegram_data)
|
||||
apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data)
|
||||
if apdu.security_control.security_suite != 0:
|
||||
logger.warning("Untested security suite")
|
||||
if apdu.security_control.authenticated and not apdu.security_control.encrypted:
|
||||
logger.warning("Untested authentication only")
|
||||
if not apdu.security_control.authenticated and not apdu.security_control.encrypted:
|
||||
logger.warning("Untested not encrypted or authenticated")
|
||||
if apdu.security_control.compressed:
|
||||
logger.warning("Untested compression")
|
||||
if apdu.security_control.broadcast_key:
|
||||
logger.warning("Untested broadcast key")
|
||||
telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii")
|
||||
else:
|
||||
try:
|
||||
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
|
||||
raise RuntimeError("Looks like a general_global_cipher frame "
|
||||
"but telegram specification is not matching!")
|
||||
except Exception:
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
|
||||
raise RuntimeError(
|
||||
"Looks like a general_global_cipher frame but telegram specification is not matching!")
|
||||
except Exception:
|
||||
pass
|
||||
telegram_data = self.decrypt_telegram_data(
|
||||
telegram_data=telegram_data,
|
||||
encryption_key=encryption_key,
|
||||
authentication_key=authentication_key
|
||||
)
|
||||
|
||||
if self.apply_checksum_validation and self.telegram_specification['checksum_support']:
|
||||
self.validate_checksum(telegram_data)
|
||||
@ -112,6 +86,42 @@ class TelegramParser(object):
|
||||
|
||||
return telegram
|
||||
|
||||
def decrypt_telegram_data(self, encryption_key, authentication_key, telegram_data):
|
||||
"""
|
||||
Check if telegram data is encrypted and decrypt if applicable.
|
||||
"""
|
||||
# if self._telegram_encryption_active is False:
|
||||
# # If encryption is not working, stop trying and logging warnings.
|
||||
# return telegram_data
|
||||
|
||||
if self.telegram_specification.get("general_global_cipher"):
|
||||
enc_key = unhexlify(encryption_key)
|
||||
auth_key = unhexlify(authentication_key)
|
||||
telegram_data = unhexlify(telegram_data)
|
||||
apdu = XDlmsApduFactory.apdu_from_bytes(apdu_bytes=telegram_data)
|
||||
if apdu.security_control.security_suite != 0:
|
||||
logger.warning("Untested security suite")
|
||||
if apdu.security_control.authenticated and not apdu.security_control.encrypted:
|
||||
logger.warning("Untested authentication only")
|
||||
if not apdu.security_control.authenticated and not apdu.security_control.encrypted:
|
||||
logger.warning("Untested not encrypted or authenticated")
|
||||
if apdu.security_control.compressed:
|
||||
logger.warning("Untested compression")
|
||||
if apdu.security_control.broadcast_key:
|
||||
logger.warning("Untested broadcast key")
|
||||
telegram_data = apdu.to_plain_apdu(enc_key, auth_key).decode("ascii")
|
||||
self._telegram_encryption_active = True
|
||||
else:
|
||||
try:
|
||||
if unhexlify(telegram_data[0:2])[0] == GeneralGlobalCipher.TAG:
|
||||
logger.warning("Looks like a general_global_cipher frame "
|
||||
"but telegram specification is not matching!")
|
||||
except Exception:
|
||||
pass
|
||||
self._telegram_encryption_active = False
|
||||
|
||||
return telegram_data
|
||||
|
||||
@staticmethod
|
||||
def validate_checksum(telegram):
|
||||
"""
|
||||
|
0
test/clients/__init__.py
Normal file
0
test/clients/__init__.py
Normal file
21
test/clients/test_filereader.py
Normal file
21
test/clients/test_filereader.py
Normal file
@ -0,0 +1,21 @@
|
||||
import unittest
|
||||
import tempfile
|
||||
|
||||
from dsmr_parser.clients.filereader import FileReader
|
||||
from dsmr_parser.telegram_specifications import V5
|
||||
from test.example_telegrams import TELEGRAM_V5
|
||||
|
||||
|
||||
class FileReaderTest(unittest.TestCase):
|
||||
def test_read_as_object(self):
|
||||
with tempfile.NamedTemporaryFile() as file:
|
||||
with open(file.name, "w") as f:
|
||||
f.write(TELEGRAM_V5)
|
||||
|
||||
telegrams = []
|
||||
reader = FileReader(file=file.name, telegram_specification=V5)
|
||||
# Call
|
||||
for telegram in reader.read_as_object():
|
||||
telegrams.append(telegram)
|
||||
|
||||
self.assertEqual(len(telegrams), 1)
|
77
test/clients/test_rfxtrx_protocol.py
Normal file
77
test/clients/test_rfxtrx_protocol.py
Normal file
@ -0,0 +1,77 @@
|
||||
from unittest.mock import Mock
|
||||
|
||||
import unittest
|
||||
|
||||
from dsmr_parser import obis_references as obis
|
||||
from dsmr_parser.clients.rfxtrx_protocol import create_rfxtrx_dsmr_protocol, PACKETTYPE_DSMR, SUBTYPE_P1
|
||||
from dsmr_parser.objects import Telegram
|
||||
|
||||
TELEGRAM_V2_2 = (
|
||||
'/ISk5\2MT382-1004\r\n'
|
||||
'\r\n'
|
||||
'0-0:96.1.1(00000000000000)\r\n'
|
||||
'1-0:1.8.1(00001.001*kWh)\r\n'
|
||||
'1-0:1.8.2(00001.001*kWh)\r\n'
|
||||
'1-0:2.8.1(00001.001*kWh)\r\n'
|
||||
'1-0:2.8.2(00001.001*kWh)\r\n'
|
||||
'0-0:96.14.0(0001)\r\n'
|
||||
'1-0:1.7.0(0001.01*kW)\r\n'
|
||||
'1-0:2.7.0(0000.00*kW)\r\n'
|
||||
'0-0:17.0.0(0999.00*kW)\r\n'
|
||||
'0-0:96.3.10(1)\r\n'
|
||||
'0-0:96.13.1()\r\n'
|
||||
'0-0:96.13.0()\r\n'
|
||||
'0-1:24.1.0(3)\r\n'
|
||||
'0-1:96.1.0(000000000000)\r\n'
|
||||
'0-1:24.3.0(161107190000)(00)(60)(1)(0-1:24.2.1)(m3)\r\n'
|
||||
'(00001.001)\r\n'
|
||||
'0-1:24.4.0(1)\r\n'
|
||||
'!\r\n'
|
||||
)
|
||||
|
||||
OTHER_RF_PACKET = b'\x03\x01\x02\x03'
|
||||
|
||||
|
||||
def encode_telegram_as_RF_packets(telegram):
|
||||
data = b''
|
||||
|
||||
for line in telegram.split('\n'):
|
||||
packet_data = (line + '\n').encode('ascii')
|
||||
packet_header = bytes(bytearray([
|
||||
len(packet_data) + 3, # excluding length byte
|
||||
PACKETTYPE_DSMR,
|
||||
SUBTYPE_P1,
|
||||
0 # seq num (ignored)
|
||||
]))
|
||||
|
||||
data += packet_header + packet_data
|
||||
# other RF packets can pass by on the line
|
||||
data += OTHER_RF_PACKET
|
||||
|
||||
return data
|
||||
|
||||
|
||||
class RFXtrxProtocolTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
new_protocol, _ = create_rfxtrx_dsmr_protocol('2.2',
|
||||
telegram_callback=Mock(),
|
||||
keep_alive_interval=1)
|
||||
self.protocol = new_protocol()
|
||||
|
||||
def test_complete_packet(self):
|
||||
"""Protocol should assemble incoming lines into complete packet."""
|
||||
|
||||
data = encode_telegram_as_RF_packets(TELEGRAM_V2_2)
|
||||
# send data broken up in two parts
|
||||
self.protocol.data_received(data[0:200])
|
||||
self.protocol.data_received(data[200:])
|
||||
|
||||
telegram = self.protocol.telegram_callback.call_args_list[0][0][0]
|
||||
assert isinstance(telegram, Telegram)
|
||||
|
||||
assert float(telegram[obis.CURRENT_ELECTRICITY_USAGE].value) == 1.01
|
||||
assert telegram[obis.CURRENT_ELECTRICITY_USAGE].unit == 'kW'
|
||||
|
||||
assert float(telegram[obis.GAS_METER_READING].value) == 1.001
|
||||
assert telegram[obis.GAS_METER_READING].unit == 'm3'
|
@ -21,7 +21,7 @@ class TelegramParserV5Test(unittest.TestCase):
|
||||
telegram = parser.parse(TELEGRAM_V5, throw_ex=True)
|
||||
except Exception as ex:
|
||||
assert False, f"parse trigged an exception {ex}"
|
||||
print('test: ', type(telegram.P1_MESSAGE_HEADER), telegram.P1_MESSAGE_HEADER.__dict__)
|
||||
|
||||
# P1_MESSAGE_HEADER (1-3:0.2.8)
|
||||
assert isinstance(telegram.P1_MESSAGE_HEADER, CosemObject)
|
||||
assert telegram.P1_MESSAGE_HEADER.unit is None
|
||||
|
Loading…
Reference in New Issue
Block a user