import time from machine import Pin, Timer from micropython import const PULSE_NONE = const(0) PULSE_FALSE = const(1) PULSE_TRUE = const(2) PULSE_ERROR = const(3) PULSE_MIN_BEGIN = const(4) # Anzahl der Flankenpaare, die fuer die automatische Erkennung gesammelt werden AUTODETECT_SAMPLES = const(10) DCF77_PIN = const(2) # GPIO-Pin des DCF77-Empfaengers WEEKDAY_NAMES = ["", "Mo", "Di", "Mi", "Do", "Fr", "Sa", "So"] class dcf77: # ------------------------------------------------------------------ # Timer-ISR: so minimal wie moeglich # Nur Zeitstempel + Rohsignal lesen, Flag setzen # Keinerlei Logik, keine Listenoperationen, kein GC-Risiko # ------------------------------------------------------------------ def _timerTick(self, t): self._isrTime = time.ticks_ms() self._isrSignal = self.dcfPin.value() self._isrFlag = True def __init__( self, dcf_pin: Pin, inverted: bool = None, # None = automatische Erkennung false_time: list[int] = [50, 130], true_time: list[int] = [150, 230], pause_time: list[int] = [1700, 2500] ): self.FALSE_TIME = false_time self.TRUE_TIME = true_time self.TICK59_TIME = pause_time self.DEBUG = False self.dcfPin = Pin(dcf_pin, mode=Pin.IN, pull=Pin.PULL_UP) # Automatische Erkennung vorbereiten if inverted is None: self.inverted = False # vorlaeufig; wird nach Lernphase gesetzt self._autodetect = True self._rising_widths = [] # Breiten gemessen an steigender Flanke self._falling_widths = [] # Breiten gemessen an fallender Flanke else: self.inverted = inverted self._autodetect = False # Zustand fuer die Flankenauswertung in der Hauptschleife self.pulse = PULSE_NONE self._oldSignal = False self._lastEdgeTime = 0 self.deltaTime = 0 # ISR-Übergabevariablen (werden nur im ISR geschrieben, nur in update() gelesen) self._isrFlag = False self._isrTime = 0 self._isrSignal = 0 self._tim = Timer(period=10, mode=Timer.PERIODIC, callback=self._timerTick) # ------------------------------------------------------------------ # Prueft ob eine Breite in ein DCF77-Pulsfenster faellt # ------------------------------------------------------------------ def _is_valid_pulse(self, width): return self.FALSE_TIME[0] <= width <= self.TRUE_TIME[1] # ------------------------------------------------------------------ # Lernphase: Flankenbreiten sammeln und inverted bestimmen # ------------------------------------------------------------------ def _autodetect_update(self, delta, rising): if rising: self._rising_widths.append(delta) else: self._falling_widths.append(delta) total = len(self._rising_widths) + len(self._falling_widths) if total < AUTODETECT_SAMPLES * 2: return rising_score = sum(1 for w in self._rising_widths if self._is_valid_pulse(w)) falling_score = sum(1 for w in self._falling_widths if self._is_valid_pulse(w)) # Die Flanke mit mehr gueltigen Pulsbreiten beendet den Puls. # Normal (inverted=False): Puls endet an der fallenden Flanke # Invertiert (inverted=True): Puls endet an der steigenden Flanke self.inverted = rising_score > falling_score self._autodetect = False if self.DEBUG: print("Autodetect: rising={} falling={} -> inverted={}".format( rising_score, falling_score, self.inverted)) # ------------------------------------------------------------------ # Flankenauswertung: muss regelmaessig aus der Hauptschleife aufgerufen werden # Gibt True zurueck wenn ein neuer Puls in self.pulse bereitsteht # ------------------------------------------------------------------ def update(self): # ISR-Flag atomar lesen und loeschen if not self._isrFlag: return False self._isrFlag = False now = self._isrTime rawSignal = bool(self._isrSignal) led.value(rawSignal) # Signal normalisieren (nach Lernphase) dcfSignal = rawSignal if self._autodetect else (rawSignal ^ self.inverted) if self._lastEdgeTime == 0: # Erster Aufruf: nur initialisieren, noch keine Auswertung self._lastEdgeTime = now self._oldSignal = dcfSignal return False # Delta nur bei Flanke berechnen delta = 0 if self._oldSignal != dcfSignal: delta = time.ticks_diff(now, self._lastEdgeTime) self.deltaTime = delta # Fallende Flanke (logisch) if self._oldSignal and not dcfSignal: if self._autodetect: self._autodetect_update(delta, rising=False) else: # Puls endet hier: Breite auswerten if self.TRUE_TIME[0] <= delta <= self.TRUE_TIME[1]: self.pulse = PULSE_TRUE elif self.FALSE_TIME[0] <= delta <= self.FALSE_TIME[1]: self.pulse = PULSE_FALSE else: self.pulse = PULSE_ERROR # Steigende Flanke (logisch) elif not self._oldSignal and dcfSignal: if self._autodetect: self._autodetect_update(delta, rising=True) else: # Pause endet hier: Laenge auswerten if self.TICK59_TIME[0] <= delta <= self.TICK59_TIME[1]: self.pulse = PULSE_MIN_BEGIN elif delta >= self.TRUE_TIME[1]: pass # normale Sekunden-Pause (~800 ms): kein Fehler else: self.pulse = PULSE_ERROR self._oldSignal = dcfSignal self._lastEdgeTime = now return self.pulse != PULSE_NONE # ------------------------------------------------------------------ # Hilfsfunktion: BCD-Dekodierung (LSB zuerst) # ------------------------------------------------------------------ def _bcd(self, bits, start, length): weights = [1, 2, 4, 8, 10, 20, 40, 80] val = 0 for i in range(length): if bits[start + i] == 1: val += weights[i] return val # ------------------------------------------------------------------ # Hilfsfunktion: gerade Paritaetspruefung ueber bits[start..end] # ------------------------------------------------------------------ def _parity_even(self, bits, start, end): return sum(bits[start:end + 1]) % 2 == 0 # ------------------------------------------------------------------ # DCF77-Dekodierung # # DCF77-Bitstruktur (Bit 0 zuerst, 59 Bits pro Minute): # Bit 0 : Startbit, immer 0 # Bits 1-14 : Wetterdaten / reserviert # Bit 15 : Rufbit Antennen # Bit 16 : Reserviert # Bit 17 : MESZ aktiv (1 = Sommerzeit) # Bit 18 : MEZ aktiv (1 = Winterzeit) # Bit 19 : Schaltsekunde angekuendigt # Bit 20 : Startbit Zeitinfo, immer 1 # Bits 21-27: Minute (BCD, LSB zuerst) # Bit 28 : Paritaet Minute (gerade Paritaet ueber Bits 21-28) # Bits 29-34: Stunde (BCD, LSB zuerst) # Bit 35 : Paritaet Stunde (gerade Paritaet ueber Bits 29-35) # Bits 36-41: Tag (BCD, LSB zuerst) # Bits 42-44: Wochentag (1=Mo .. 7=So) # Bits 45-49: Monat (BCD, LSB zuerst) # Bits 50-57: Jahr (BCD, LSB zuerst, zweistellig) # Bit 58 : Paritaet Datum (gerade Paritaet ueber Bits 36-58) # # Gibt bei Erfolg einen Dict mit Zeitinfo zurueck, bei Fehler None. # ------------------------------------------------------------------ def decode(self, bits): if len(bits) < 59: print("DECODE ERROR: nur {} Bits empfangen (59 erwartet)".format(len(bits))) return None # Auf Empfangsfehler pruefen (-1 im relevanten Bereich) if any(bits[i] == -1 for i in range(20, 59)): print("DECODE ERROR: fehlerhafte Bits im Zeitbereich empfangen") return None # Startbits pruefen if bits[0] != 0: print("DECODE ERROR: Bit 0 ist nicht 0 (Startbit)") return None if bits[20] != 1: print("DECODE ERROR: Bit 20 ist nicht 1 (Zeitstartbit)") return None # Paritaeten pruefen if not self._parity_even(bits, 21, 28): print("DECODE ERROR: Paritaetsfehler Minute (Bits 21-28)") return None if not self._parity_even(bits, 29, 35): print("DECODE ERROR: Paritaetsfehler Stunde (Bits 29-35)") return None if not self._parity_even(bits, 36, 58): print("DECODE ERROR: Paritaetsfehler Datum (Bits 36-58)") return None # Zeitwerte dekodieren minute = self._bcd(bits, 21, 7) hour = self._bcd(bits, 29, 6) day = self._bcd(bits, 36, 6) weekday = self._bcd(bits, 42, 3) month = self._bcd(bits, 45, 5) year = self._bcd(bits, 50, 8) mesz = bits[17] # 1 = Sommerzeit (MESZ), 0 = Winterzeit (MEZ) tz_str = "MESZ" if mesz else "MEZ" wd_str = WEEKDAY_NAMES[weekday] if 1 <= weekday <= 7 else "??" print(">>> Zeit : {:02d}:{:02d} {}".format(hour, minute, tz_str)) print(">>> Datum : {} {:02d}.{:02d}.20{:02d}".format(wd_str, day, month, year)) return { "hour": hour, "minute": minute, "day": day, "weekday": weekday, "month": month, "year": 2000 + year, "mesz": bool(mesz) } #-------------------------------------------------------------------------------------------- led = Pin("LED", mode=Pin.OUT) # WL_GPIO0 (WLAN-Chip) als Ausgang fuer die LED PULSE_NAMES = { PULSE_NONE : "PULSE_NONE ", PULSE_FALSE : "PULSE_FALSE ", PULSE_TRUE : "PULSE_TRUE ", PULSE_ERROR : "PULSE_ERROR ", PULSE_MIN_BEGIN : "PULSE_MIN_BEGIN", } def main(): # inverted=None -> automatische Erkennung (Standard, ca. 20 Sekunden Lernphase) # inverted=False -> normales Modul (Puls endet an fallender Flanke) # inverted=True -> invertiertes Modul (Puls endet an steigender Flanke) dcf = dcf77(DCF77_PIN) # Bit-Puffer fuer eine DCF77-Minute (59 Bits, Bit 0 zuerst) bits = [] bitcnt = 0 # Bitzaehler, beginnt bei 0 nach jedem Minutenstart while True: if dcf.update(): p = dcf.pulse dcf.pulse = PULSE_NONE if p == PULSE_MIN_BEGIN: # Minutenpause erkannt: abgeschlossene Minute auswerten, Puffer und Zaehler leeren print("--- Minutenbeginn: {} Bits empfangen ---".format(len(bits))) print("Bits: " + "".join(str(b) for b in bits)) dcf.decode(bits) bits = [] bitcnt = 0 elif p == PULSE_TRUE: bits.append(1) print(PULSE_NAMES.get(p, "UNKNOWN") + " deltaTime:{0:6} Bit#{1:02d}: 1".format( dcf.deltaTime, bitcnt)) bitcnt += 1 elif p == PULSE_FALSE: bits.append(0) print(PULSE_NAMES.get(p, "UNKNOWN") + " deltaTime:{0:6} Bit#{1:02d}: 0".format( dcf.deltaTime, bitcnt)) bitcnt += 1 else: # PULSE_ERROR: Bit nicht verwertbar, Platzhalter eintragen bits.append(-1) print(PULSE_NAMES.get(p, "UNKNOWN") + " deltaTime:{0:6} Bit#{1:02d}: ?".format( dcf.deltaTime, bitcnt)) bitcnt += 1 if __name__ == '__main__': main()