#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ WS20 / Elsner P01/P02 UART-Empfänger mit fest eingestelltem COM-Port. Vor dem Ausführen bei Bedarf oben SERIAL_PORT und BAUDRATE anpassen. Installation: pip install pyserial """ from __future__ import annotations import math import sys import time from dataclasses import dataclass from typing import Optional, Tuple try: import serial except ImportError: serial = None # --------------------------------------------------------------------------- # FESTE EINSTELLUNGEN FÜR DIE ENTWICKLUNGSUMGEBUNG # --------------------------------------------------------------------------- # Hier COM-Port und Baudrate direkt eintragen. # Windows-Beispiele: "COM1", "COM3", "COM7" # Linux-Beispiele: "/dev/ttyUSB0", "/dev/ttyS0", "/dev/ttyACM0" SERIAL_PORT = "COM54" BAUDRATE = 1200 SERIAL_TIMEOUT = 0.2 TEMP_OFFSET = 0.0 VERBOSE_OUTPUT = False START_BYTE = 0x53 P01_LEN = 14 P02_LEN = 17 @dataclass class WeatherFrame: frame_type: str raw: bytes temp_raw: int bright_s_raw: int bright_w_raw: int bright_o_raw: int night_code: int unknown_11: int wind_raw: int rain_raw: int cs1: int cs2: Optional[int] = None lux_direct_p02: Optional[int] = None def checksum8(data: bytes) -> int: return sum(data) & 0xFF def round_half_away_from_zero(x: float) -> int: if math.isnan(x) or math.isinf(x): return 0 if x >= 0: return int(math.floor(x + 0.5)) return -int(math.floor(abs(x) + 0.5)) def round_to_1_decimal_half_away(x: float) -> float: return round_half_away_from_zero(x * 10.0) / 10.0 # --------------------------------------------------------------------------- # ROM-Code-Berechnungen # --------------------------------------------------------------------------- def calc_temperature_rom(temp_raw: int, temp_offset: float = 0.0) -> Tuple[float, float, float, float]: """ Temperaturberechnung gemäß rekonstruierter ROM-Rechenkette. Temp_raw = LSB + 256*MSB U = Temp_raw * 1.2207000256 / 1000.0 R = 3570.0 * U / (5.0 - U) T0 = ((R - 2000.0) / 15.8) + 25.0 + TempOffset T = T0 - (30.0 - T0)*0.13, falls T0 < 30.0, sonst T0 Rückgabe: (Temperatur, T0, Sensorwiderstand, Spannung) """ u = temp_raw * 1.2207000256 / 1000.0 if u >= 5.0: return float("nan"), float("nan"), float("nan"), u r = 3570.0 * u / (5.0 - u) t0 = ((r - 2000.0) / 15.8) + 25.0 + temp_offset if t0 < 30.0: t = t0 - ((30.0 - t0) * 0.13) else: t = t0 return t, t0, r, u def calc_brightness_rom(raw: int) -> Tuple[float, int]: """ Helligkeitsberechnung gemäß ROM-Code. if raw < 200: H = 0 else: H = AnzeigeKonvertierung((raw - 200) * 1.2207000256 / 45.0) """ if raw < 200: return 0.0, 0 h = (raw - 200) * 1.2207000256 / 45.0 return h, round_half_away_from_zero(h) def calc_wind_rom(wind_raw: int) -> Tuple[float, float]: """Wind_m_s = Wind_raw * 0.263199985 gemäß ROM-Code.""" w = wind_raw * 0.263199985 return w, round_to_1_decimal_half_away(w) def parse_frame(frame: bytes) -> Optional[WeatherFrame]: if len(frame) not in (P01_LEN, P02_LEN): return None if frame[0] != START_BYTE: return None cs1 = checksum8(frame[:13]) if cs1 != frame[13]: return None temp_raw = frame[1] | (frame[2] << 8) bright_s_raw = frame[3] | (frame[4] << 8) bright_w_raw = frame[5] | (frame[6] << 8) bright_o_raw = frame[7] | (frame[8] << 8) night_code = frame[9] unknown_11 = frame[10] wind_raw = frame[11] rain_raw = frame[12] if len(frame) == P01_LEN: return WeatherFrame( frame_type="P01/Legacy", raw=frame, temp_raw=temp_raw, bright_s_raw=bright_s_raw, bright_w_raw=bright_w_raw, bright_o_raw=bright_o_raw, night_code=night_code, unknown_11=unknown_11, wind_raw=wind_raw, rain_raw=rain_raw, cs1=frame[13], ) cs2 = checksum8(frame[:16]) if cs2 != frame[16]: return None lux_direct = frame[14] | (frame[15] << 8) return WeatherFrame( frame_type="P02 erweitert", raw=frame, temp_raw=temp_raw, bright_s_raw=bright_s_raw, bright_w_raw=bright_w_raw, bright_o_raw=bright_o_raw, night_code=night_code, unknown_11=unknown_11, wind_raw=wind_raw, rain_raw=rain_raw, cs1=frame[13], cs2=frame[16], lux_direct_p02=lux_direct, ) def extract_next_frame(buffer: bytearray) -> Optional[bytes]: """Sucht den nächsten gültigen P01- oder P02-Frame im Empfangspuffer.""" while True: try: start = buffer.index(START_BYTE) except ValueError: buffer.clear() return None if start > 0: del buffer[:start] if len(buffer) < P01_LEN: return None # P02 zuerst prüfen, sobald genug Bytes da sind. if len(buffer) >= P02_LEN: candidate17 = bytes(buffer[:P02_LEN]) if checksum8(candidate17[:13]) == candidate17[13] and checksum8(candidate17[:16]) == candidate17[16]: del buffer[:P02_LEN] return candidate17 candidate14 = bytes(buffer[:P01_LEN]) if checksum8(candidate14[:13]) == candidate14[13]: del buffer[:P01_LEN] return candidate14 # Ungültig: Startbyte verwerfen und neu synchronisieren. del buffer[0] continue # Noch nicht genug Bytes, um P02 sicher auszuschließen. return None def night_text(frame: WeatherFrame) -> str: if frame.night_code == 0x00: return "Tag / nicht Nacht" if frame.frame_type.startswith("P01") and frame.night_code == 0x64: return "Nacht (P01-Code 0x64)" if frame.frame_type.startswith("P02") and frame.night_code == 0x1F: return "Nacht (P02-Code 0x1F)" if frame.night_code == 0x64: return "Nachtcode 0x64" if frame.night_code == 0x1F: return "Nachtcode 0x1F" return f"unbekannt: 0x{frame.night_code:02X}" def rain_text(rain_raw: int) -> str: if rain_raw == 0: return "trocken" if 1 <= rain_raw <= 7: return f"Regen aktiv, Stufe/Code {rain_raw}" return f"unbekannter Regenwert {rain_raw}" def format_frame(frame: WeatherFrame) -> str: temp, temp_t0, temp_r, temp_u = calc_temperature_rom(frame.temp_raw, temp_offset=TEMP_OFFSET) bright_s, bright_s_disp = calc_brightness_rom(frame.bright_s_raw) bright_w, bright_w_disp = calc_brightness_rom(frame.bright_w_raw) bright_o, bright_o_disp = calc_brightness_rom(frame.bright_o_raw) wind, wind_disp = calc_wind_rom(frame.wind_raw) raw_hex = " ".join(f"{b:02X}" for b in frame.raw) lines = [] lines.append("=" * 78) lines.append(f"{time.strftime('%Y-%m-%d %H:%M:%S')} {frame.frame_type}") lines.append(f"Raw: {raw_hex}") lines.append(f"Checksumme 1: 0x{frame.cs1:02X} gültig") if frame.cs2 is not None: lines.append(f"Checksumme 2: 0x{frame.cs2:02X} gültig") lines.append("") lines.append(f"Temperatur raw: {frame.temp_raw:5d} -> {temp:6.2f} °C Anzeige: {round_half_away_from_zero(temp):d} °C") if VERBOSE_OUTPUT: lines.append(f" U={temp_u:.6f} V, R={temp_r:.2f} Ohm, T0={temp_t0:.4f} °C, Offset={TEMP_OFFSET:.3f}") lines.append(f"Helligkeit Süd raw: {frame.bright_s_raw:5d} -> {bright_s:7.3f} Anzeige: {bright_s_disp:d} kLux") lines.append(f"Helligkeit West raw: {frame.bright_w_raw:5d} -> {bright_w:7.3f} Anzeige: {bright_w_disp:d} kLux") lines.append(f"Helligkeit Ost raw: {frame.bright_o_raw:5d} -> {bright_o:7.3f} Anzeige: {bright_o_disp:d} kLux") if frame.lux_direct_p02 is not None: valid = "" if 0 <= frame.lux_direct_p02 <= 999 else " (außerhalb 0..999)" lines.append(f"P02 Lux direkt: {frame.lux_direct_p02:d} lx{valid}") lines.append(f"Wind raw: {frame.wind_raw:3d} -> {wind:6.3f} m/s Anzeige: {wind_disp:.1f} m/s") lines.append(f"Regen: {rain_text(frame.rain_raw)}") lines.append(f"Nacht/Status Byte 10: {night_text(frame)}") lines.append(f"Byte 11 unbekannt: {frame.unknown_11} / 0x{frame.unknown_11:02X}") return "\n".join(lines) def main() -> int: if serial is None: print("Fehler: pyserial ist nicht installiert. Installation: pip install pyserial", file=sys.stderr) return 2 print(f"Öffne {SERIAL_PORT} mit {BAUDRATE} Baud ...") print("Abbruch mit Strg+C") print("COM-Port und Baudrate sind direkt oben im Skript eingetragen.") buffer = bytearray() try: with serial.Serial(SERIAL_PORT, BAUDRATE, timeout=SERIAL_TIMEOUT) as ser: while True: chunk = ser.read(128) if chunk: buffer.extend(chunk) while True: frame_bytes = extract_next_frame(buffer) if frame_bytes is None: break frame = parse_frame(frame_bytes) if frame is None: continue print(format_frame(frame), flush=True) else: time.sleep(0.02) except KeyboardInterrupt: print("\nBeendet.") return 0 except serial.SerialException as e: print(f"Serieller Fehler: {e}", file=sys.stderr) return 1 if __name__ == "__main__": raise SystemExit(main())