Multifunction DAQ

cancel
Showing results for 
Search instead for 
Did you mean: 

real time read/plot using python

Solved!
Go to solution

Hi,

I am using Python(3) together with the nidaqmx package for my data acquisition and hardware control. For one of the applications we want to continuously read from the buffer and plot the data real time. So far I have found no way to do this.

I figured the sample_mode has to be set to CONTINUOUS but couldn't find a method to directly use the data obtained in order to make a real time plot. So I do not know how to go from there. 

 

In the NI-DAQmx Python documentation I read that real-time methods are not yet incorporated in the nidaqmx package.

However, it was not entirely clear to me if that means that real time plotting of the data is not (yet) possible with Python and the nidaqmx package. 

 

TL;DR

So my question is:

Is it currently possible to continuously acquire data and plot it real time using the Python together with the nidaqmx package?

If so, what are the methods/protocol from the nidaqmx package that are needed to establish this?

 

Thanks in advance for your response.

 

0 Kudos
Message 1 of 6
(12,620 Views)

For people having the same problem:

So far I someone send me this video from National Instruments on YouTube:
https://www.youtube.com/watch?v=NMMRbPvkzFs 

 

However here they use software timing.

It is still not clear to me if the same thing can be acchieved using hardware timing, and I am still interested if someone know how to. 

0 Kudos
Message 2 of 6
(12,595 Views)

Hi LHD,

 

Good source, the youtube video you posted, but it's indeed SW timed.

 

For HW timing you would like to use the nidaqmx.task.timing to assign a clock and a conversion rate to your task.

https://nidaqmx-python.readthedocs.io/en/latest/timing.html

 

Then you just need to make sure to call the read function regularly to read all the sample automatically generated by your card.

 

I found a short introduction with timing example here:

https://www.pythonforthelab.com/blog/controlling-a-national-instruments-card-with-python/

 

 

Real-time

I think there is a confusion what we understand under real-time. I think a good introduction can be found here:

https://en.wikipedia.org/wiki/Real-time_operating_system

But in a nutshell we mean deterministic scheduling of processes and the DAQmx functions making use of RealTime OS features are not available in Python.

 

If you are interested to see which functions you are missing you can have a look at the LabVIEW help for Daqmx (as they are obviously not documented for python)

http://zone.ni.com/reference/en-XX/help/370469AP-01/lvdaqmx/palrealtime/

 

I hope this helps

Andreas
CLA, CTA
Message 3 of 6
(12,586 Views)
Solution
Accepted by topic author LHD

Hi,

 

Thanks for the input. I implemented it in the following way:

 

"""Starts writing a waveform continuously to the patchclamp. While reading the buffer periodically"""
#Taking the latest configurations for the ports (I saved the configurations somwhere else, but they represent the device and port path)
configs = Configuration()
self.patchVoltOutChan = configs.patchVoltOutChannel self.patchCurOutChan = configs.patchCurOutChannel self.patchVoltInChan = configs.patchVoltInChannel #DAQ with nidaqmx.Task() as writeTask, nidaqmx.Task() as readTask: writeTask.ao_channels.add_ao_voltage_chan(self.patchVoltInChan) readTask.ai_channels.add_ai_voltage_chan(self.patchVoltOutChan) readTask.ai_channels.add_ai_voltage_chan(self.patchCurOutChan) readTask.timing.cfg_samp_clk_timing(rate = self.sampleRate, sample_mode = nidaqmx.constants.AcquisitionType.CONTINUOUS, samps_per_chan = self.readNumber) writeTask.timing.cfg_samp_clk_timing(rate = self.sampleRate, sample_mode = nidaqmx.constants.AcquisitionType.CONTINUOUS) reader = AnalogMultiChannelReader(readTask.in_stream) writer = AnalogSingleChannelWriter(writeTask.out_stream) wave = np.ones(5)*3 #Dummy waveform writer.write_many_sample(wave) """Reading data from the buffer in a loop. """ output = np.zeros([2, self.readNumber]) writeTask.start() readTask.start() while not self.isInterruptionRequested(): #Loop untill interuption is requested (part of UI in PyQt5 (QThread)) reader.read_many_sample(data = output, number_of_samples_per_channel = self.readNumber) #Emiting the data just received as a signal output = np.around(output, 2) #Round all values to 2 decimals to avoid overflow self.measurement.emit(output[0,:], output[1,:]) #To be processed in another file (PyQt5) #Pausing before reading buffer again time.sleep(self.readInterval)

The code is part of a small user interface (PyQt5) and imbedded in a QThread so it runs on the background while the user interface is still available. 

 

To avoid the buffer getting full I would recommend reading more samples than are approximated to be in the buffer at that time. This way it will have to wait slightly longer for the last few samples and than leaves you with an empty buffer and a fixed sized numpy array each time you read.

 

This code works for me up to approximately ~2-3kHz.

For higher frequencies some better buffer management is needed. 

 

If there are any questions I will be happy to answer them :).

 

Kind regards,

 

Lex 

0 Kudos
Message 4 of 6
(12,541 Views)

While I do realize this thread was last updated in 2019 and it is now 2025, I am desperate. I am using a method similar to this to live stream data from the DAQ. The data is streamed in one thread, the data updates a live plot in another, and the data is processed in another. However, when filtering the data I've noticed a small error. There is a slight lag or jump in between readings that ruins the signal when it's passed through a filter to remove 60Hz.

Here is how I have implemented the reading.

    def collect_without_p(self,b):
        

        with ni.Task() as ai_task, ni.Task() as ao_task:
            for i in range(0,3):
                ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai"+str(i),terminal_config = TerminalConfiguration.RSE )
                ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai"+str(i+8),terminal_config = TerminalConfiguration.RSE )
            ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai3",terminal_config = TerminalConfiguration.RSE ) 
            for i in range(0,self.p.pressure): 
                ai_task.ai_channels.add_ai_voltage_chan("Dev1/ai"+str(i+4),terminal_config = TerminalConfiguration.DIFF, min_val=-0.2,max_val= 0.2)
            ai_task.timing.cfg_samp_clk_timing(self.p.freq,sample_mode= AcquisitionType.CONTINUOUS, samps_per_chan= self.chunk )
            ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao0") 
            ao_task.ao_channels.add_ao_voltage_chan("Dev1/ao1") 
            reader = AnalogMultiChannelReader(ai_task.in_stream)

            data = [[5],[-5]]
            ao_task.write(data) 
            
            # starting the task with a try and catch so a 
            # keyboard interrrupt can stop the data collection
            ao_task.start()
            ai_task.start()
            try:
                # initializing start time and number of reads 
                # number of reads isn't necessary it just prints the data to the terminal
                # time is initalized so elapsed time can be found
                i = 0
                output = np.zeros([len(self.p.titles)-1, self.chunk])
                #previous = time.time()
                while  not b.is_set():
                    #current = time.time()
                    #print(current - previous)
                    #previous = current
                    reader.read_many_sample(data = output,number_of_samples_per_channel = self.chunk,timeout=(self.chunk/self.freq))
                    self.p.add(output)
                # Once loop is exited read remaining samples from buffer
                ra = ai_task.in_stream.avail_samp_per_chan
                print(f"Points to read: {ra}")
                for  i in range(0,ra):
                    self.p.add(ai_task.read())


            finally:
                ai_task.stop()
                ao_task.stop()
0 Kudos
Message 5 of 6
(242 Views)

Hello,

 

Your issue is not very clear. What is the lag and jump you mention ?

 

I am doing the same (you can see posts authored by ft_06 like https://forums.ni.com/t5/Multifunction-DAQ/How-to-feed-callback-data-parameter-to-the-callback-passe...), acquiring AI channels up to 1MHz sampling on several cards connected together and drawing live the results. I am using register_every_... to get a callback called when the right number of samples is collected and feeding them to another thread doing the drawing. Your method should also work fine as the "read_many_sample" would do a blocking call

I compute the timing through the number of samples acquired and I did not see glitches.

 

Is it a glitch between AO and AI ? between the various AIs ? How do you know you have a glitch ?

 

The only thing I would imagine is about length of buffers but in that case you would get errors indicating your code is starved of data or there is an overflow.

 

 

0 Kudos
Message 6 of 6
(141 Views)