Forum: Mikrocontroller und Digitale Elektronik Knobelei: Nachrichten-Queue funktioniert fehlerhaft - Tipps?


von Robert (Gast)


Lesenswert?

Hallo!

Ich benutze eine Nachrichten-Queue um nachrichten zwischen verschiedenen 
Prozessen in meinem AVR auszutauschen und um lokale Puffer einzusparen.

Funktionsweise:

Es existiert eine Array von Nachrichten-Structs. Die Nachrichten können 
in Form einer verketteteten Liste geordnet werden ("next"-Pointer"). 
Zudem existieren Pointer auf Head und Tail zum einfachen Duchlaufen und 
Hinzufügen von Nachrichten.

Die einzelnen Prozesse holen sich per "new_msg()" den Pointer auf einen 
noch leeren Slot und benutzen ihn. Wenn alle Slots belegt sind wird NULL 
zurückgeliefert.

In der Hauptschleife des Programms werden nun die Nachrichten 
nacheinander durchgegangen und den entsprechenden "process"-Functionen 
der Module übergeben (als Pointer). Möchte ein Modul die Nachricht noch 
Puffern wird "OK" zurückgeliefert und es wird einfach mit der nächsten 
Nachricht weiter gemacht. Wird etwas anderes zurückgeliefert, wird die 
Nachricht gelöscht.

Scheinbar mache ich beim Löschen oder beim Einfügen einen Fehler, denn 
nach ein paar Durchläufen ist zwar "msgs_head == NULL", aber es sind 
wenn man das Array per Zähler durchläuft noch Slots von Nachrichten 
besetzt.
1
/* container for message */
2
typedef struct msg_s msg_t;
3
4
struct msg_s
5
{
6
     uint8_t dest;
7
     uint8_t data[50];  // msg data
8
     msg_t* next;    // pointer to next message
9
};
10
11
typedef enum result_e
12
{
13
  OK = 0,
14
  FREE,
15
  EOVERFLOW,
16
  EUNKNOWN,
17
  ENOTSUPP,
18
} result_t;
19
20
21
#define NUM_MSGS  10
22
23
msg_t *msgs_tail;
24
msg_t *msgs_head;
25
msg_t msgs[NUM_MSGS];
26
27
void msg_init()
28
{
29
  uint8_t c;
30
  for (c = 0; c < NUM_MSGS; c++)
31
  {
32
    msgs[c].dest = 0x00; // Nachricht ist frei
33
    msgs[c].next = NULL;
34
  }
35
  msgs_tail = msgs_head = NULL;
36
}
37
38
void msg_task()
39
{
40
  msg_t* previous = NULL;
41
  msg_t* iterator = msgs_head;
42
  result_t result = EUNKNOWN;
43
  while ( (iterator) /*&& (result != OK)*/ )
44
  {
45
    // direct to appropriate dest
46
    switch (dest)
47
    {
48
    case 0x01:
49
      result = a_process(iterator);
50
      break;
51
    case 0x02:  
52
      result = b_process(iterator);
53
      break;
54
    default:  // error!
55
      result = EUNKNOWN;
56
    }
57
58
    // delete msgs that were not successfully processed to avoid queue stall
59
    if (result != OK)
60
    {
61
      iterator->dest = 0x00;  // mark as free/available
62
63
      if (iterator == msgs_tail)  // iterator was last msg
64
      {
65
        msgs_tail = previous;
66
      }
67
68
      if (iterator == msgs_head)  // iterator was first msg
69
      {
70
        msgs_head = iterator->next;
71
      }
72
      else
73
      {
74
        previous->next = iterator->next;
75
      }
76
    }
77
    else
78
    {
79
      previous = iterator;
80
    }
81
82
    // move on
83
    iterator = iterator->next;
84
  }
85
}
86
87
msg_t *msg_new()
88
{
89
  uint8_t c;
90
  msg_t *free_msg = NULL;
91
  // look for free msg slot in "msgs"
92
  for (c = 0; c < NUM_MSGS; c++)
93
  {
94
    if (msgs[c].dest == 0x00)
95
    {
96
      free_msg = &msgs[c];
97
      break;
98
    }
99
  }
100
101
  if (free_msg)
102
  {
103
    // set next to NULL
104
    free_msg->next = NULL;
105
    // now append
106
    if (msgs_tail != NULL)
107
    {
108
      msgs_tail->next = free_msg;
109
    }
110
    else
111
    {
112
      msgs_tail = free_msg;
113
      msgs_head = free_msg;
114
    }
115
  }
116
117
   return free_msg;
118
}

Ich würde mich sehr freuen wenn jemand mal schaut wo mein Denkfehler 
ist! Keine der Funktionen werden durch Interrupts aufgerufen! Die 
Pointer Head, Tail, Next etc. werden im sonstigen Code nirgendwo gelesen 
oder gar geschrieben.

Grüße
Robert

von Maxx (Gast)


Lesenswert?

Ja, dir fehlt vollkommen die Zugriffskontrolle.

Stell dir vor:
1
 if (iterator == msgs_head)  // iterator was first msg
2
      {
3
        msgs_head = iterator->next;
4
      }

Wird genau dann Unterbrochen, wenn iterator->next ausgelesen, aber 
msgs_head noch nicht gesetzt wurde. In der Unterbrechung wurde ein neues 
Element eingefügt direkt nach iterator. Sprich iterator->next war vorher 
NULL, neues elemt wird eingefügt iterator->next ist ein neues element. 
msgs_head wird auf NULL gesetzt.

Und noch viele anderen stellen.

Überleg wann du nicht unterbrochen werden darfst um das zu vermeiden, 
verhindere das und schau dann nochmal.

von Robert (Gast)


Lesenswert?

Hallo!

Wer soll mich denn unterbrechen? Ich schrieb "die Funktionen werden von 
KEINEN Interrupts aufgerufen!" Und KEIN anderer Codeteil außer den 
gezeigten ändert tail, head, msg!

Grüße
Robert

von MaWin (Gast)


Lesenswert?

>     switch (dest)

Welches dest

>    msgs_tail->next = free_msg;

Tail und Head bedeutungsmässig vertauscht.

>    // delete msgs that were not successfully processed to avoid queue stall

Und erfolgreich bearbeitete verstopfen sie ?

> a_process(

Wenn a-process oder b_process ein msg_new macht, stimmt nichts mehr von 
deinen Annahmen (previous etc.)


Versuch doch einfach mal

while(msgs_head)
{
  iterator=msgs_head;
  msgs_head=iterator->next;
  if(!msgs_head) msgs_tail=NULL;
  switch(iterator->dest)
  {
  :
  }
  iterator->dest=0; // nach Bearbeitung immer freigeben
}


und bei msgs_new Head und Tail richtigrum (obwohl es ja nur Namen sind)

msgs_new()
{
  uint8_t c;

  for(c=0;c<NUM_MSGS;c++)
  {
    if(!msgs[c].dest)
    {
        msgs[c].next=NULL;
        if(!msgs_tail) msgs_tail=msgs+c;
        if(msgs_head) msgs_head->next=msgs+c;
        msgs_head=msgs+c;
        break;
    }
  }
}

Eine free-list wäre wohl auch sinnvoll.

von Maxx (Gast)


Lesenswert?

Sorry, der Teil da unten ist mir nciht aufgefallen :-P

von Robert (Gast)


Lesenswert?

Hallo!

Danke für das Feedback. Habe jetzt des "dest" geändert und head und tail 
getauscht.
1
/* container for message */
2
typedef struct msg_s msg_t;
3
4
struct msg_s
5
{
6
     uint8_t dest;
7
     uint8_t data[50];  // msg data
8
     msg_t* next;    // pointer to next message
9
};
10
11
typedef enum result_e
12
{
13
  OK = 0,
14
  FREE,
15
  EOVERFLOW,
16
  EUNKNOWN,
17
  ENOTSUPP,
18
} result_t;
19
20
#define NUM_MSGS  10
21
22
msg_t *msgs_head;
23
msg_t *msgs_tail;
24
msg_t msgs[NUM_MSGS];
25
26
void msg_init()
27
{
28
  uint8_t c;
29
  for (c = 0; c < NUM_MSGS; c++)
30
  {
31
    msgs[c].dest = 0x00; // Nachricht ist frei
32
    msgs[c].next = NULL;
33
  }
34
  msgs_head = msgs_tail = NULL;
35
}
36
37
result_t a_process(msg_t* msg)
38
{
39
  if (externe_bedingung)
40
    return OK;
41
  else
42
    return FREE;
43
}
44
45
result_t b_process(msg_t* msg)
46
{
47
  if (externe_bedingung2)
48
    return OK;
49
  else
50
    return FREE;
51
}
52
53
void msg_task()
54
{
55
  msg_t* previous = NULL;
56
  msg_t* iterator = msgs_tail;
57
  result_t result = EUNKNOWN;
58
  while ( (iterator) )
59
  {
60
    // direct to appropriate dest
61
    switch (iterator->dest)
62
    {
63
    case 0x01:
64
      result = a_process(iterator);
65
      break;
66
    case 0x02:  
67
      result = b_process(iterator);
68
      break;
69
    default:  // error!
70
      result = EUNKNOWN;
71
    }
72
73
    // delete msgs that were not successfully processed to avoid queue stall
74
    if (result != OK)
75
    {
76
      iterator->dest = 0x00;  // mark as free/available
77
78
      if (iterator == msgs_head)  // iterator was last msg
79
      {
80
        msgs_head = previous;
81
      }
82
83
      if (iterator == msgs_tail)  // iterator was first msg
84
      {
85
        msgs_tail = iterator->next;
86
      }
87
      else
88
      {
89
        previous->next = iterator->next;
90
      }
91
    }
92
    else
93
    {
94
      previous = iterator;
95
    }
96
97
    // move on
98
    iterator = iterator->next;
99
  }
100
}
101
102
msg_t *msg_new()
103
{
104
  uint8_t c;
105
  msg_t *free_msg = NULL;
106
  // look for free msg slot in "msgs"
107
  for (c = 0; c < NUM_MSGS; c++)
108
  {
109
    if (msgs[c].dest == 0x00)
110
    {
111
      free_msg = &msgs[c];
112
      break;
113
    }
114
  }
115
116
  if (free_msg)
117
  {
118
    // set next to NULL
119
    free_msg->next = NULL;
120
    // now append
121
    if (msgs_head != NULL)
122
    {
123
      msgs_head->next = free_msg;
124
    }
125
    else
126
    {
127
      msgs_head = free_msg;
128
      msgs_tail = free_msg;
129
    }
130
  }
131
132
   return free_msg;
133
}

Folgende Anmerkungen zu MaWin's Vorschlägen:
1) behoben - sorry
2) behoben

3) Nein, dann werden die Nachrichten gepuffert. Genau das ist ja der 
Sinn der Sache. Sonst brauch ich in jedem Modul einen Puffer. Zudem 
meine Schleife ALLE Nachrichten durchiteriert - d.h. andere Nachrichten 
werden soweit möglich verarbeitet. Wenn ein Prozess eine Nachricht 
partout nicht freigibt (was nicht passiert - das sehe ich beim Debuggen) 
verringert sich halt der verfügbare Platz um eine Nachricht.

4) Durch ausführen von msg_new() wird ja INNERHALB der Queue nichts 
geändert, es wird etwas angehängtr. D.h. er gibt ein Element AM ENDE 
mehr, das auch sogar von der Schleife durchgelaufen wird. Da ich 
zwischendrin nichts einfüge wird die Abfolge in der Schleife nicht 
geändert. Oder?

5) Vielen Dank für den Code. Allerdings passt die eben nicht zu dem 
benötigten Verhalten was in 3) beschrieben ist. Eine Nachricht kann eben 
eine "interne" Nachricht sein, aber auch eine "externe" die gesendet 
werden soll. Wenn das momentan nicht möglich ist soll diese ja nicht 
gelöscht werden. Auch möchte ich diese nicht umkopieren, sondern direkt 
puffern. Also einfach "ok" zurück und im nächsten Durchlauf gucken ob 
sich etwas geändert hat. Derweil sollen natürlich andere Nachrichten 
weiter hinten in der Queue verarbeitet werden können.

Grüße
Robert

von Sascha W. (sascha-w)


Lesenswert?

Hallo,

sag mal ...

wenn jede Nachricht (bzw. der Puffer) eine konstante Größe hat, und in 
jedem Nachrichtenpuffer mit 'dest' der Belegungsstatus gespeichert wird 
dann kannst du dir die Verkettung auch sparen.
Packe deine msg_struct (mit dest und msg_data) in ein Array, dann kannst 
du dir freie Plätze suchen (dest=0) und neu belegen. Oder Nachrichen 
löschen indem du dest auf Null setzt.

Sascha

von Robert (Gast)


Lesenswert?

Hallo Sascha,

da hast du völlig recht. Allerdings möchte ich gerne die zeitliche 
Abfolge der Nachrichten beim Senden beibehalten. Also die älteste 
Nachricht (und das ist eben leider nicht die mit dem kleinsten Index) 
soll in jedem Durchgang als erstes an die Prozesse übergeben werden. Bei 
der Queue können die Pakete zwar zeitlich "aufrücken", aber nie eine 
Nachricht überholen.

Grüße
Robert

von Robert (Gast)


Lesenswert?

Hallo!

Um die Geschichte aufzulösen:

Der Fehler lag in der Allokation neuer Elemente:
1
msg_t *msg_new()
2
{
3
  [...]
4
5
    if (msgs_head != NULL)
6
    {
7
      msgs_head->next = free_msg;
8
    }
9
    else
10
    {
11
      msgs_head = free_msg;
12
      msgs_tail = free_msg;
13
    }
14
  }
15
16
   return free_msg;
17
}

muss lauten
1
msg_t *msg_new()
2
{
3
  [...]
4
5
    if (msgs_head != NULL)
6
    {
7
      msgs_head->next = free_msg;
8
    }
9
    else
10
    {
11
      msgs_tail = free_msg;
12
    }
13
    msgs_head = free_msg;
14
  }
15
16
   return free_msg;
17
}

Das msg_head muss natürlich IMMER auf die neue Nachricht zeigen. Nur das 
Update von head->next oder eben tail ist konditional.

Grüße
Robert

von Karl H. (kbuchegg)


Lesenswert?

Das sieht immer noch nicht richtig aus.
Ich rate dir:
Mal dir die Situation auf!

Wenn man das erste mal verpointerte Listenstrukturen baut, kommt man 
schnell mal durcheinander.
Mit einer Skizze, wie die einzelnen Einzelknoten über Pointer 
miteinander verbunden werden und welche Operation welchen Pointer wohin 
umsetzt, kann man das ganz gut aussortieren.

Das sei deine Liste, mit 3 Knoten



  msg_head       msg_tail
  +--------+     +--------+
  |   o    |     |   o    |
  +---|----+     +---|----+
      |              |
      |              +--------------------+
      v                                   v
   +---------+   +--->+---------+   +---->+---------+
   |         |   |    |         |   |     |         |
   | next: o-----+    | next: o-----+     | next: 0 |
   +---------+        +---------+         +---------+


Jetzt kommt deine New Funktion zum Zug. Sie hat bereits einen freien 
Knoten identifiziert und einen Pointer darauf in free_msg



  msg_head       msg_tail
  +--------+     +--------+
  |   o    |     |   o    |
  +---|----+     +---|----+
      |              |
      |              +--------------------+
      v                                   v
   +---------+   +--->+---------+   +---->+---------+
   |         |   |    |         |   |     |         |
   | next: o-----+    | next: o-----+     | next: 0 |
   +---------+        +---------+         +---------+




        +-->+----------+
        |   |          |
        |   | next:  0 |
        |   +----------+
        |
        +---------+
                  |
   free_msg       |
   +---------+    |
   |    o---------+
   +---------+


Das ist deine Ausgangssituation, wenn der Code hier
1
msg_t *msg_new()
2
{
3
  [...]
4
5
    if (msgs_head != NULL)

angelangt ist. Durchlaufe den Code weiter, ändere die Pointer so wie es 
dir der Code vorschreibt und sieh nach was rauskommt.

Alles was ám Ende nicht so aussieht

   msg_head       msg_tail
  +--------+     +--------+
  |   o    |     |   o    |
  +---|----+     +---|----+
      |              |
 +----+              +--------------------+
 |                                        v
 | +---------+   +--->+---------+   +---->+---------+
 | |         |   |    |         |   |     |         |
 | | next: o-----+    | next: o-----+     | next: 0 |
 | +---------+        +---------+         +---------+
 | ^
 | +-----------------------+
 +----------+              |
            v              |
        +-->+----------+   |
        |   |          |   |
        |   | next:  o-----+
        |   +----------+
        |
        +---------+
                  |
   free_msg       |
   +---------+    |
   |    o---------+
   +---------+

ist fehlerhaft, wenn das Element vorne eingefügt werden soll.


Noch ein guter Tip.

Teste deine Queue Verwaltung erst mal auf dem PC.
Dazu schreibst du dir noch eine Funktion, die dir die Queue komplett 
ausgeben kann.
Und dann schreibst du dir ein main() in dem du in wilder Folge einfügst 
bzw. rauslöschst. Fang aber einfach an! Im ersten Testprogramm genügt 
es, wenn du einen Knoten einfügst. Im zweiten dann 2 und im dritten 3. 
Dann löscht du nach den 3 eingefügten Knoten einen raus.
Wichtig ist aber: Nach jeder, und ich meine ausnahmslos jeder Operation, 
sei es einfügen oder löschen, lässt du dir die Queue ausgeben!
Wenn da in der Verpointerung was nicht stimmt, dann merkt man das 
mittels Ausgabe meistens sehr schnell.

Daher auch auf dem PC: Da hast du bessere Ausgabemöglichkeiten.

Danymische Datenstrukturen sehen einfach aus, können aber tricky sein. 
Überhaupt dann wenn man das noch nie gemacht hat. Da hilft dann nur: 
systematisch rangehen, das Raten einstellen und Fakten (in Form von 
Output) schaffen. Und Mitzeichnen um zu verstehen, welcher Pointer an 
welcher Stelle zu früh überschrieben wurde, noch nicht geschrieben 
wurde, vergessen wurde, etc.

von Karl H. (kbuchegg)


Lesenswert?

Das hier
1
   // delete msgs that were not successfully processed to avoid queue stall
2
    if (result != OK)
3
    {
4
      iterator->dest = 0x00;  // mark as free/available
5
6
      if (iterator == msgs_head)  // iterator was last msg
7
      {
8
        msgs_head = previous;
9
      }
10
11
      if (iterator == msgs_tail)  // iterator was first msg
12
      {
13
        msgs_tail = iterator->next;
14
      }
15
      else
16
      {
17
        previous->next = iterator->next;
18
      }
19
    }
20
    else
21
    {
22
      previous = iterator;
23
    }

sieht ebenfalls nach einem grossen Haufen Unsinn aus.

von Karl H. (kbuchegg)


Lesenswert?

Ausserdem hast du einen riesen Bock geschossen.

Bau deine Queue so auf, wie man das normalerweise macht.
Neue Elemente werden hinten angefügt und nicht vorne und die Queue wird 
von vorne nach hinten abgearbeitet.

Fügst du nämlich vorne ein, dann sind die ältesten Messages 
logischerweise hinten. Das heist aber auch, dass du die Queue von hinten 
nach vorne durchgehen musst, wenn du die Knoten in der Reihenfolge ihres 
Eintreffens abarbeiten willst. Nur: Wie machst du denn das mit einer 
einfach verketteten Liste, wenn die Verkettung genau in die andere 
Richtung zeigt?

Und die Vorgabe:
> Möchte ein Modul die Nachricht noch Puffern wird "OK" zurückgeliefert
> und es wird einfach mit der nächsten Nachricht weiter gemacht. Wird
> etwas anderes zurückgeliefert, wird die Nachricht gelöscht.

die würde ich mir noch mal gut überlegen.
Das Problem: Wenn ein Task eine Nachricht zurückstellt, was passiert 
dann mit den restlichen Nachrichten. Oder anders ausgedrückt: Wann wird 
eine zurückgestellte Nachricht erneut dem Task zur Bearbeitung 
vorgelegt? Werden erst alle anderen Nachrichten abgearbeitet, oder wird 
die Nachricht dem Task immer wieder erneut zugestellt, oder ....

Wenn ein Task eine Nachricht nicht sofort bearbeiten will, dann muss er 
selbst dafür sorgen, dass er sich das merkt. Er kann ja zb eine neue 
Nachricht in die Queue stellen. Geht die Kontrolle an den Verteiler 
zurück, dann löscht der auf jeden Fall die Nachricht. Für ihn ist die 
Sache damit erledigt und die Nachricht wird auf jeden Fall gelöscht. 
Alles andere führt nur zu Chaos, weil plötzlich keinerlei Reihenfolgen 
mehr definiert sind.

von MaWin (Gast)


Lesenswert?

> wird ja INNERHALB der Queue nichts geändert

Aber die von aussen auf die queue verweisenden Variablen
wie previous etc. bekommen eventuell unpassende Inhalte.

Deine Überlegungen scheinen mir unvollständig zu sein.

Du solltest BEVOR du programmierst erst mal eine in sich
konsistente Gedankenlogik haben, sonst wird das auch mit
dem Programm nichts.

von Robert (Gast)


Lesenswert?

Hallo Karl heinz Buchegger,

vielen Dank für die hinweise. Kann es sein, dass sich deine Hinweise auf 
das erste Posting mit der noch "falschen" Verwendung der Begriffe Head 
und Tail beziehen? Dann erklärt sich vl. auch zusammen mit der jetzt 
geposteten Korrektur der Einfügrroutine die deiner Meinung nach falsche 
Verkettung mit "Next"-Pointern. Denn ansonsten wird das von Dir 
beschriebene Einfügen felherfrei durchgeführt. Die verschiedenen 
Szenarien beim Einfügen und Löschen (Head&Tail sind null, nur Tail ist 
Null, beide sind verschieden etc.) hab ich bereits durchprobiert und 
keine weiteren Fehler gefunden.

Da die einzelnen Elemente auch Arrays von ca. 50 Byte bestehen möchte 
ich in Falle eines "Pufferns" nicht umkopieren sondern das ELement 
zurückstellen. Dann bleibt es einfach an seiner Stelle, was dazu führt 
das alte Elemente auch eher aufgrufen werden, was für meinennudng Sinn 
macht. Natürlich könnte ich auch auf das Umkopieren verzichten und das 
Element anhand der zeiger nach hinten sortieren - das wird allerdings 
nicht weniger kompliziert und bietet in meinemn Augen keinerlei 
Vorteile.

Was ansonsten irgendwelche Modifikationen angeht: Keine Interrupts und 
die externen Funktionen rufen nur (fein zeitlich hintereinander!) 
msg_new() oder msg_task() auch. Das einzige was die Funktionen dann 
bekommen ist ein zeiger auf die datenstruktur, an der Sie aber auch am 
Next-Pointer nicht herummanipulieren.

Grüße
Robert

Bitte melde dich an um einen Beitrag zu schreiben. Anmeldung ist kostenlos und dauert nur eine Minute.
Bestehender Account
Schon ein Account bei Google/GoogleMail? Keine Anmeldung erforderlich!
Mit Google-Account einloggen
Noch kein Account? Hier anmelden.