# coding: utf-8 # Programm zum Testen von Prologix kompatiblen GPIB Controllern # grundsätzlicher Programmablauf: # main() und darin I/O init # send config_controller, Kommandosequenz ar488 betreffend # send ++auto 1, if auto == "once" # send config_dev, Initialisierung einer Messart # LOOP # send ++auto 1, if auto == "beforetrigger" # send triggercmd, Kommando zur Einleitung einer Messung # wait waittime # send ++read 10 or ++auto 1, if auto == "no" or auto == "beforereading" # read result with function getnl # send ++auto 0, if auto == "beforetrigger" or auto == "beforereading" # END LOOP # # Programmabruch mit Ctrl-c import time, os, sys, socket # das Programm wurde auf linux entwickelt und benutzt. Es sollte aber einfach nach # windows portierbar sein. Nur auf linux steht das Modul termios zur Verfügung, mi # dem man die seriellen Parameter einstellen kann. Dies geschieht in der Funktion # openfct. Entweder arbeitet man hier mit den default Parametern oder man # implementiert dort im else Zweig die entsprechenden windows counterparts. linux = True if linux: import termios #end if isipaddr = False if isipaddr: controller = ("172.26.10.20", 1234) # um den original prologix ethernet zu Vergleich heranziehen zu können else: # symbolic link zu /dev/ttyACMx in meinem System: #controller = "/dev/tty-esp32-1" # ar488 #controller = "/dev/tty10c4_8a5f" # china prologix controller controller = "/dev/ttyACM0" #end if # name des zu testenden Geräts device = "wavepro950" # wenn auto == "no" wird ++read 10 zum Lesen verwendet. Die anderen Kommandos führen # alle zu einem ++auto 1, jedoch jeweils an anderen Stellen im Code, siehe die Funktionen # read und main. auto = "no" # "no", "once", "beforereading", "beforetrigger", no --> ++read 10 # die Wartezeit zwischen triggercmd und getnl bzw. ++read 10 waittime = 5 # Komandos, die für alle Geräte gleich sind config_controller = [b"++ifc\n", b"++mode 1\n", b"++savecfg 0\n", b"++eoi 1\n", b"++eos 2\n", b"++eot_char 10\n", b"++eot_enable 1\n", b"++read_tmo_ms 3000\n", b"++auto 0\n"] config_auto = [b"++auto 1\n"] readcmd = b"++read 10\n" autocmd = b"++auto 1\n" autooffcmd = b"++auto 0\n" # config_dev, triggercmd und GPIB-Adresse sind Geräte spezifisch. if device == "hp3458a": devaddr = 21 config_dev = [b"++addr %d\n" %devaddr, b"INBUF ON\n", b"TRIG HOLD\n", b"++clr\n", b"NPLC 100\n",b"AZERO ON\n", b"DCV AUTO\n"] triggercmd = b"TRIG SGL\n" elif device == "wavepro950": devaddr = 18 config_dev = [b"++addr %d\n" %devaddr, b"stop;chdr off;pacl;clsw;aset;wait;trmd single\n"] triggercmd = b"arm;wait;c1:pava? rms\n" elif device == "wavetek1271": devaddr = 10 config_dev = [b"++addr %d\n" %devaddr, b"++clr\n", b"TRG SRC EXT\n", b"DCV AUTO,DELAY DFLT,RESL6\n"] triggercmd = b"*TRG;RDG?\n" elif device == "hp34401a": devaddr = 7 config_dev = [b"++addr %d\n" %devaddr, b"*cls;*ese 1\n", b"trig:sour imm\n", b'sens:func "volt:dc"\n', b"samp:coun 1\n", b"sens:volt:dc:rang:auto on\n"] triggercmd = b"read?\n" elif device == "nrvd": devaddr = 8 config_dev = [b"++addr %d\n" %devaddr, b"disp on\n", b"diag:test1 off;*cls;*ese 0;*sre 0;:stat:pres\n", b'sens1:func "pow:ac"\n', b"sens1:pow:rang:auto on\n", b"sens1:corr:fref:stat on\n", b"calc1:filt:auto on\n", b"disp1:ann:pow:res \"medium\"\n"] triggercmd = b"inp:sel \"A\";*trg\n" else: raise ValueError("no cmds defined for %s" % device) #enif # IO-Routinen class IO: def __init__(self): self.nlbufsize = 4096 self._buffer = bytearray(self.nlbufsize) self._vbuf = memoryview(self._buffer) self._nlast = 0 self._nget = 0 self.openfct() #end __init__ def openfct(self): if isipaddr: self.fid = socket.create_connection(controller, 10) self.fid.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) else: self.fid = open(controller, "r+b", buffering=0) if linux: # configure the cdc line /dev/ttyACMx # default flags settings of the cdc-acm usb driver: """ struct ktermios tty_std_termios = { /* for the benefit of tty drivers */ .c_iflag = ICRNL | IXON, .c_oflag = OPOST | ONLCR, .c_cflag = B38400 | CS8 | CREAD | HUPCL, .c_lflag = ISIG | ICANON | ECHO | ECHOE | ECHOK | ECHOCTL | ECHOKE | IEXTEN, .c_cc = INIT_C_CC, .c_ispeed = 38400, .c_ospeed = 38400, /* .c_line = N_TTY, */ }; """ fid = self.fid.fileno() flags = termios.tcgetattr(fid) flags[0] &= ~(termios.IXON|termios.IXOFF|termios.IXANY|termios.ICRNL) # c_iflag flags[1] &= ~(termios.OPOST|termios.ONLCR) # c_oflag flags[2] &= ~(termios.CRTSCTS|termios.PARENB|termios.CSTOPB|termios.CSIZE) # c_cflag flags[2] |= termios.CREAD|termios.CLOCAL|termios.CS8 flags[3] &= ~(termios.ICANON|termios.ECHO|termios.ECHOE|termios.ISIG|termios.IEXTEN) # c_lflag flags[4] = termios.B1152000 flags[5] = termios.B1152000 termios.tcsetattr(fid, termios.TCSANOW, flags) else: # windows pass #end if #end if #end openfct def get(self, buffer): #print(len(buffer)) if isipaddr: n = self.fid.recv_into(buffer) else: n = self.fid.readinto(buffer) #end if if n == 0: raise EOFError("EOF in io.get") #end if return n #end get def write(self, buf): if isipaddr: self.fid.send(buf) else: self.fid.write(buf) #end if #end write def getnl(self): """ read until all combinations or series of \n, or \r appear """ try: st = time.time() nbrread = 0 debugprint("getnl start") while True: unterminated = False eaten = 0 #debugprint("loop start: nlast=%d, nget=%d" % (self._nlast, self._nget)) while self._nlast < self._nget and self._buffer[self._nlast] in b"\r\n": #debugprint("eat: %d" % self._buffer[self._nlast]) self._nlast += 1 eaten += 1 #end while if self._nlast != self._nget: npos = self._nlast while npos < self._nget and self._buffer[npos] not in b"\r\n": npos += 1 #end while #debugprint("found ", repr(self._buffer[self._nlast:npos]), "nget=", self._nget) if npos != self._nget: # isolate proper terminated mesaage oldlast = self._nlast self._nlast = npos + 1 debugprint("readtime=%.6f, nbr read calls=%d" % (time.time()-st, nbrread)) return self._buffer[oldlast:npos] elif self._nget == self.nlbufsize: if self._nlast == eaten: # no reset necessary, the application will terminate raise OSError("getnl: no newline found, message does not fit in buffer") else: # no \n found and no place in buffer to read further bytes, # copy to the buffer start to make further free place and read more n = self._nget - self._nlast self._buffer[:n] = self._buffer[self._nlast:self._nget] self._nlast = 0 self._nget = n #end if else: unterminated = True #end if else: # reset buffer self._nlast = self._nget = 0 #end if if unterminated: # may be a programming error (unterminated meassge) or a broken line without exception # debugprint("unterminated %s" % (repr(self._buffer[self._nlast:self._nget],))) pass #end if n = self.get(self._vbuf[self._nget:]) debugprint("read: %s" % (repr(self._buffer[self._nlast:self._nget+n],))) nbrread += 1 #debugprint("newread: readpos=%d, nbrread=%d" % (self._nget, n)) self._nget += n #end while except EOFError: self._nlast = self._nget = 0 #reset buffer raise OSError("getnl: socket/file closed") #end try #end getnl #end IO def sendseq(io, seq): for cmd in seq: io.write(cmd) debugprint("->", cmd) #time.sleep(0.1) #end for #end openfct def read(io): if auto == "no": io.write(readcmd) debugprint("->: %s" % readcmd) elif auto == "beforereading": io.write(autocmd) debugprint("->: %s" % autocmd) #end if #time.sleep(0.04) s = io.getnl() debugprint("<-: %s" % (repr(s),)) if auto == "beforereading": io.write(autooffcmd) debugprint("->: %s" % autooffcmd) #end if #end read def main(): io = IO() sendseq(io, config_controller) if auto == "once": sendseq(io, config_auto) #end if sendseq(io, config_dev) #time.sleep(3) while True: if auto == "beforetrigger": io.write(autocmd) debugprint("->: %s" % autocmd) #end if io.write(triggercmd) debugprint("->: %s" % triggercmd) time.sleep(waittime) read(io) if auto == "beforetrigger": io.write(autooffcmd) debugprint("->: %s" % autooffcmd) #end if #end while #end main # debug print part, not of further interest starttime = time.time() cnt = 80 mlines = 4 baselen = 16 lc = cnt - baselen def timer(): global starttime t = time.time() - starttime if t > 86400.0: starttime = t - starttime #end if return t #end timer def debugprint(header="", msg="", msgtype="text"): if msgtype == "raw": print("% 13.6f " % timer() + header + ":\n" + msg) elif isinstance(msg, (int, float)): print("% 13.6f " % timer() + header + ": " + str(msg)) elif not len(msg): print("% 13.6f " % timer() + header) else: pm = "% 13.6f " % timer() + header + ":\n" lmsg = len(msg) if msgtype == "binary" and isinstance(msg, (memoryview, bytes, bytearray)): maxmsg = (cnt - baselen) * mlines // 3 repcrlf = False else: maxmsg = (cnt - baselen) * mlines repcrlf = True #end if if isinstance(msg, memoryview): msg = bytes(msg[:maxmsg if lmsg > maxmsg else lmsg]) if msgtype == "text": msg = msg.decode("ascii", "ignore") else: # binary msg = msg.hex(" ", 1) #end if elif isinstance(msg, (list, dict)): msg = repr(msg) elif isinstance(msg, (bytes, bytearray)): if msgtype == "text": msg = msg[:maxmsg if lmsg > maxmsg else lmsg].decode("ascii", "ignore") else: # binary msg = msg[:maxmsg if lmsg > maxmsg else lmsg].hex(" ", 1) #end if elif isinstance(msg, str): msg = msg[:maxmsg if lmsg > maxmsg else lmsg] else: return #end if if repcrlf: msg = msg.replace("\n", "\\n") msg = msg.replace("\r", "\\r") #end if lmsg = len(msg) sm = 0 while lmsg > 0: el = lc if lmsg > lc else lmsg pm += "\t\t" + msg[sm:sm+el] + "\n" sm += el lmsg -= el #end while sys.stdout.write(pm) #end if #end debugprint main()