Multifunction DAQ

cancel
Showing results for 
Search instead for 
Did you mean: 

Read after every Write at a fixed rate with a USB 6361

This is likely a simple task, but I am new to the NI-DAQmx Python API, and I need some help figuring out how to read into an array after every write operation, performed at a fixed rate. What needs to happen is that, from an array of data, the program writes a value to an analog output channel, then reads a value from an analog input channel, and continues this until the entire output array has been written (with corresponding input array values). I've tried various things like changing the acquisition type, changing the buffer size, but I keep getting errors that prevent the script from running. Here is the relevant code, which is my most recent (and likely buggy) version:
Code Test.PNG
Any and all help would be greatly appreciated.

0 Kudos
Message 1 of 11
(1,890 Views)

What is your application? why do you need to read after every write?

 

BTW, I cannot edit the image and so does anyone else, please attach your python code.

Santhosh
Soliton Technologies

New to the forum? Please read community guidelines and how to ask smart questions

Only two ways to appreciate someone who spent their free time to reply/answer your question - give them Kudos or mark their reply as the answer/solution.

Finding it hard to source NI hardware? Try NI Trading Post
0 Kudos
Message 2 of 11
(1,864 Views)

I am writing DC voltages to an analog input channel, and reading back DC voltages from an analog input channel. The output data is a sine wave for an integer multiple of periods, and after every write I should be reading back and storing a voltage, and this process should be happening at a fixed frequency. Here is the paste of my code:

import os
import sys
import PID
import nidaqmx
import os.path
import numpy as np
from multiprocessing import Process
import matplotlib.animation as animation
from nidaqmx._task_modules.channels.ao_channel import AOChannel
from nidaqmx.constants import TerminalConfiguration, Edge, AcquisitionType
import nidaqmx.constants
from nidaqmx.stream_readers import AnalogSingleChannelReader
from nidaqmx.stream_writers import AnalogSingleChannelWriter
import nidaqmx.stream_writers
import math

system = nidaqmx.system.System.local()
devicename = system.devices[0].name

intask = nidaqmx.Task() # read task
outtask = nidaqmx.Task() # write task

InputSampleRate = 10000
OutputSampleRate = 10000

# Configure the output buffer, use 4 bytes for float numbers
buff_size = 4 * 1000

outtask.ao_channels.add_ao_voltage_chan('Dev130/ao0', min_val=-10, max_val=10, units=nidaqmx.constants.VoltageUnits.VOLTS)

outtask.timing.cfg_samp_clk_timing(rate=OutputSampleRate, active_edge=nidaqmx.constants.Edge.RISING,
sample_mode=nidaqmx.constants.AcquisitionType.CONTINUOUS, samps_per_chan=1000)

outtask.triggers.start_trigger.cfg_anlg_edge_start_trig(trigger_source='APFI0')
#outtask.out_stream.buffer_size = buff_size

intask.ai_channels.add_ai_voltage_chan('Dev130/ai0', terminal_config=nidaqmx.constants.TerminalConfiguration.RSE,
min_val=-10, max_val=10, units=nidaqmx.constants.VoltageUnits.VOLTS)

intask.timing.cfg_samp_clk_timing(rate=InputSampleRate, active_edge=nidaqmx.constants.Edge.RISING,
sample_mode=nidaqmx.constants.AcquisitionType.CONTINUOUS, samps_per_chan=1000)

# Set the sampling rate, frequency, and duration of the waveform
sampling_rate = 10000.0
frequency = 1000.0
duration = 1.0

# Generate the sine wave data
t = np.arange(0, duration, 1.0 / sampling_rate)
outdata = 3 * np.sin(2 * np.pi * frequency * t)
indata = np.zeros(len(outdata))

outtask.start()
indate = intask.read()

print('indata =', indata)

# Plot the generated and read waveforms
import matplotlib.pyplot as plt
fig, ax = plt.subplots(2, 1, figsize=(8, 6))
ax[0].plot(t, outdata)
ax[0].set_title('Generated Sine Wave')
ax[0].set_xlabel('Time (s)')
ax[0].set_ylabel('Voltage (V)')
ax[1].plot(t, indata)
ax[1].set_title('Read Waveform')
ax[1].set_xlabel('Time (s)')
ax[1].set_ylabel('Voltage (V)')
plt.tight_layout()
plt.show()

intask.close()
outtask.close()

 

0 Kudos
Message 3 of 11
(1,848 Views)

I've been trying an updated approach to this, which is still not working as intended, but might be better in the long run:

import numpy as np
import sympy as sp
import nidaqmx as ni
import matplotlib.pyplot as plt
from nidaqmx.constants import TaskMode, Slope, Edge
import threading

def query_devices():
    local_system = ni.system.System.local()
    driver_version = local_system.driver_version

    print('DAQmx {0}.{1}.{2}'.format(driver_version.major_version, driver_version.minor_version,
                                    driver_version.update_version))

    for device in local_system.devices:
        print('Device Name: {0}, Product Category: {1}, Product Type: {2}'.format(
            device.name, device.product_category, device.product_type))

def read_data(read_task, indata, nsamples):
    read_task.start()
    while True:
        if len(indata) >= nsamples:
            break
        indata.extend(read_task.read(nsamples - len(indata), timeout=10))
    read_task.stop()
    read_task.close()

def iosync(data, samplerate=10000, input_mapping=['Dev129/ai0'], output_mapping=['Dev129/ao0']):
  """
  Simultaneous reading and writing though NI device.

  Parameters:
  data: array_like, shape (nsamples, len(output_mapping))
    Data to be send to output channels.  
  samplerate: int
    The sampling rate
  input_mapping: list of str
    Input device channels
  output_mapping: list of str
    Output device channels

  Returns:
  indata, shape (nsamples, len(input_mapping))
    Data from input channels
  """
  outdata = np.asarray(data).T
  nsamples = 1000

  max_out_range = 5.0
  max_in_range = 10
  max_outdata = np.max(np.abs(outdata))
  if max_outdata > max_out_range:
    raise ValueError(
      f"outdata amplitude ({max_outdata:.2f}) larger than allowed range"
      f"(+-{max_out_range}).")



  with ni.Task() as read_task, ni.Task() as write_task:
    for o in output_mapping:
      aochan = write_task.ao_channels.add_ao_voltage_chan(o)
      aochan.ao_max = max_out_range
      aochan.ao_min = -max_out_range
    for i in input_mapping:
      aichan = read_task.ai_channels.add_ai_voltage_chan(i)
      aichan.ai_min = -max_in_range
      aichan.ai_max = max_in_range

    for task in (read_task, write_task):
        task.timing.cfg_samp_clk_timing(rate=samplerate, source='OnboardClock',
                                        samps_per_chan=nsamples)
    
    write_task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source='/Dev129/PFI0',trigger_edge=Edge.RISING)
    read_task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source='/Dev129/PFI0',trigger_edge=Edge.RISING)
    #read_task.triggers.start_trigger.cfg_anlg_edge_start_trig(write_task.triggers.start_trigger.term, trigger_slope=Slope.RISING, trigger_level=0.1)
    write_task.write(outdata, auto_start=False)

    # Create a list to store the read data
    indata = []

    # Start a separate thread to continuously read data
    read_thread = threading.Thread(target=read_data, args=(read_task, indata, nsamples))
    read_thread.start()

    # Start both tasks
    write_task.start()

    # Write the output data
    write_task.write(outdata, auto_start=True)

    # Wait for the read thread to complete
    read_thread.join()
    

  print('indata = ',indata)

  return np.asarray(indata).T

query_devices()

# Set the sampling rate and duration of the waveform
sampling_rate = 10000.0
frequency = 100.0
duration = 1.0

# Generate the sine wave data
t = np.arange(0,duration,1/sampling_rate)
outdata = 3 * np.sin(2 * np.pi*frequency*t)
print('outdata = ', outdata)
print('sum of outdata = ', sum(outdata))

# Acquire the Data
indata = iosync(data = outdata)

fig, ax = plt.subplots(2, 1, figsize=(8, 6))
ax[0].plot(t, outdata)
ax[0].set_title('Generated Sine Wave')
ax[0].set_xlabel('Time (s)')
ax[0].set_ylabel('Voltage (V)')
ax[1].plot(t, indata)
ax[1].set_title('Read Waveform')
ax[1].set_xlabel('Time (s)')
ax[1].set_ylabel('Voltage (V)')
plt.tight_layout()
plt.show()
0 Kudos
Message 4 of 11
(1,833 Views)

Sorry, I'm no help at all with the Python aspects of this.

 

But it *sounds* like you have some kind of stimulus-response app going on that probably calls for your AO and AI tasks to be sync'ed with a hardware clock.

 

My preference would be to generate that clock with a counter output task, so I can control the time between leading and trailing edge.  Then I'd make the AO task generate samples on the leading edge while AI captures samples on the trailing edge.  

 

Hopefully someone else more familiar with the Python API can help fill in the blanks here.

 

 

-Kevin P

ALERT! LabVIEW's subscription-only policy came to an end (finally!). Unfortunately, pricing favors the captured and committed over new adopters -- so tread carefully.
0 Kudos
Message 5 of 11
(1,779 Views)

I've been doing some debugging, and I think I've pinpointed a part of the problem. The read_task is starting, but it may not be triggering. Is there some method that I can use to let me know what the status of the trigger is? I'm aware of 'is_task_done' but in my understanding this only indicates if the task has started/completed, not if it has triggered.

0 Kudos
Message 6 of 11
(1,749 Views)

Here is my most current code:

 

import numpy as np
import sympy as sp
import nidaqmx as ni
import matplotlib.pyplot as plt
from nidaqmx.constants import TaskMode, Slope, Edge
import threading

def query_devices():
    local_system = ni.system.System.local()
    driver_version = local_system.driver_version

    print('DAQmx {0}.{1}.{2}'.format(driver_version.major_version, driver_version.minor_version,
                                    driver_version.update_version))

    for device in local_system.devices:
        print('Device Name: {0}, Product Category: {1}, Product Type: {2}'.format(
            device.name, device.product_category, device.product_type))

def read_data(read_task, indata, nsamples):
    read_task.start()
    print('nsamples = ', nsamples,'and len(indata) = ', len(indata))
    print('read_data check 1')
    while True:
        if len(indata) >= nsamples:
            print('Now True')
            break
        print('Still False')
        print('indata loop =', indata)
        # SOMETHING IS HAPPENING HERE THAT PREVENTS THE LOOP FROM COMPLETING, AND IT TIMES OUT. Perhas the task hasn't triggered?
        # Check if the task has triggered
        if not read_task.is_task_done():
            print("Task has started.")
        else:
            print("Task has not started yet or has completed.")


        indata.extend(read_task.read(timeout=10))
        print('read_data check 2')
    # Debugging
    print('indata = ', indata)
    read_task.stop()
    read_task.close()

def iosync(data, samplerate=10000, input_mapping=['Dev129/ai0'], output_mapping=['Dev129/ao0']):
  """
  Simultaneous reading and writing though NI device.

  Parameters:
  data: array_like, shape (nsamples, len(output_mapping))
    Data to be send to output channels.  
  samplerate: int
    The sampling rate
  input_mapping: list of str
    Input device channels
  output_mapping: list of str
    Output device channels

  Returns:
  indata, shape (nsamples, len(input_mapping))
    Data from input channels
  """
  outdata = np.asarray(data).T
  nsamples = 1000

  max_out_range = 5.0
  max_in_range = 10
  max_outdata = np.max(np.abs(outdata))
  if max_outdata > max_out_range:
    raise ValueError(
      f"outdata amplitude ({max_outdata:.2f}) larger than allowed range"
      f"(+-{max_out_range}).")



  # Create write and read tasks, voltage ranges are specific to the NI device in use
  with ni.Task() as read_task, ni.Task() as write_task:
    for o in output_mapping:
      aochan = write_task.ao_channels.add_ao_voltage_chan(o)
      aochan.ao_max = max_out_range
      aochan.ao_min = -max_out_range
    for i in input_mapping:
      aichan = read_task.ai_channels.add_ai_voltage_chan(i)
      aichan.ai_min = -max_in_range
      aichan.ai_max = max_in_range

    # Specify task timing parameters
    for task in (read_task, write_task):
        task.timing.cfg_samp_clk_timing(rate=samplerate, source='OnboardClock',
                                        samps_per_chan=nsamples)
    
    # Set both tasks to trigger on the rising edge of the internal clock, thereby synchronizing them
    write_task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source='/Dev129/PFI0',trigger_edge=Edge.RISING)
    read_task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source='/Dev129/PFI0',trigger_edge=Edge.FALLING)
    #read_task.triggers.start_trigger.cfg_anlg_edge_start_trig(write_task.triggers.start_trigger.term, trigger_slope=Slope.RISING, trigger_level=0.1)
    write_task.write(outdata, auto_start=False)

    # Create a list to store the read data
    indata = []

    # Start a separate thread to continuously read data
    read_thread = threading.Thread(target=read_data, args=(read_task, indata, nsamples))
    print('Checkpoint 1')
    read_thread.start()
    print('Checkpoint 2')

    # Start both tasks
    write_task.start()
    print('Checkpoint 3')

    # Write the output data
    write_task.write(outdata, auto_start=True)
    print('Checkpoint 4')

    # Wait for the read thread to complete
    read_thread.join()
    print('Checkpoint 5')
    

  print('indata = ',indata)

  return np.asarray(indata).T

query_devices()

# Set the sampling rate and duration of the waveform
sampling_rate = 1000.0
frequency = 100.0
duration = 1.0

# Generate the sine wave data
t = np.arange(0,duration,1/sampling_rate)
outdata = 3 * np.sin(2 * np.pi*frequency*t)
print('outdata = ', outdata)
print('sum of outdata = ', sum(outdata))

# Acquire the data
indata = iosync(data = outdata)

fig, ax = plt.subplots(2, 1, figsize=(8, 6))
ax[0].plot(t, outdata)
ax[0].set_title('Generated Sine Wave')
ax[0].set_xlabel('Time (s)')
ax[0].set_ylabel('Voltage (V)')
ax[1].plot(t, indata)
ax[1].set_title('Read Waveform')
ax[1].set_xlabel('Time (s)')
ax[1].set_ylabel('Voltage (V)')
plt.tight_layout()
plt.show()

 

 
When I run this, Checkpoints 1 - 4 are printed, as is 'read_data check 1,' 'Still False,' 'indata loop = [],' and 'Task has started,' however it is at this point that the program hangs until timeout, without ever printing 'read_data check 2.' The problem appears to be occurring at the line with indata.extend(read_task.read(timeout=10)), which I suspect is being caused by the read() not triggering. At this point, I get 'Exception in thread Thread-1 (read_data):' followed by a bunch of errors, the most useful of which is the segment:

nidaqmx.errors.DaqReadError: Some or all of the samples requested have not yet been acquired.
To wait for the samples to become available use a longer read timeout or read later in your program. To make the samples available sooner, increase the sample rate. If your task uses a start trigger, make sure that your start trigger is configured correctly. It is also possible that you configured the task for external timing, and no clock was supplied. If this is the case, supply an external clock.
Property: DAQmx_Read_RelativeTo
Corresponding Value: DAQmx_Val_CurrReadPos
Property: DAQmx_Read_Offset
Corresponding Value: 0

Task Name: _unnamedTask<0>

Status Code: -200284
Checkpoint 5
indata = []

0 Kudos
Message 7 of 11
(1,745 Views)

Again, I can't give any detailed help on the Python syntax, but here are some general ideas and tips.

 

1. You probably don't need to (and therefore shouldn't) configure the tasks to be triggered.  It will be sufficient to have the tasks share a *sample clock*, provided the task that sources it is started last.

 

2. The first and relatively easy way to do it is to set up one of your tasks, let's say AO, to continue to configure its timing to use "OnboardClock" for sampling.  But then the AI task should configure its timing to use something like "/Dev1/ao/SampleClock" for sampling.  Then the AI task should be started *before* the AO task.  (The internal "...ao/SampleClock" signal doesn't start pulsing until you start the AO task and it stops when the AO task stops.)

 

3. I usually prefer a method where I use a counter task to generate a pulse train that *both* AI and AO use as their sample clock.  This makes it easier to control the timing between the AO stimulus and the AI response measurement if you configure the AO task to use the pulse's *leading* edge (probably rising?) and the AI task to use the same pulse's *trailing* edge (probably falling?) for sampling.

   Like before, it'll then be important to start the counter task last.  The syntax for the sample clock source would look similar to "/Dev1/Ctr0InternalOutput".

 

4. When you use a hardware clock and an output buffer for AO tasks, you should write data to the task *before* sampling starts (so typically before you call DAQmx Start).  In your code below, it appears that you start before you write.

 

 

-Kevin P

ALERT! LabVIEW's subscription-only policy came to an end (finally!). Unfortunately, pricing favors the captured and committed over new adopters -- so tread carefully.
0 Kudos
Message 8 of 11
(1,700 Views)

Many thanks for your help Kevin, I've been attempting to implement your suggestions, and here is what I have thus far:

import numpy as np
import sympy as sp
import nidaqmx as ni
import matplotlib.pyplot as plt
from nidaqmx.constants import TaskMode, Slope, Edge, TriggerType, FrequencyUnits
import threading
import time
from nidaqmx.stream_readers import AnalogSingleChannelReader
from nidaqmx.stream_writers import AnalogSingleChannelWriter

class NiJob:
    def __init__(self,sampling_rate,sin_freq,duration,debug=False):
        self.sampling_rate=sampling_rate
        self.sin_freq=sin_freq
        self.duration=duration
        # Produce the data to be written by the DAQ to output
        self.t=np.arange(0, self.duration, 1 / self.sampling_rate)
        self.outdata = 0.5 * np.sin(2 * np.pi * self.sin_freq * self.t)
        self.debug=debug
        # Ensure that the array used to store data is the same size as the output array
        self.indata=np.zeros(len(self.outdata))
        self.final_indata=None
        print('At initialization, length of outdata is', len(self.outdata), 'and length of indata is', len(self.indata))

    # Produces a list of all NI devices connected to the system and their IDs
    def query_devices(self):
        if self.debug:
            local_system = ni.system.System.local()
            driver_version = local_system.driver_version


            print('DAQmx {0}.{1}.{2}'.format(driver_version.major_version, driver_version.minor_version,
                                             driver_version.update_version))

            for device in local_system.devices:
                print('Device Name: {0}, Product Category: {1}, Product Type: {2}'.format(
                    device.name, device.product_category, device.product_type))
        else:
            raise Exception('Enable debug to run query_devices')

    # Produce lists of the specified channel/terminal types on the device in use
    def list_available_channels(self):
        device = ni.system.device.Device("Dev129")
        print('List of Device Terminals:')
        for tr in device.terminals:
            print(tr)
        print('List of Input Physical Channels:')
        for tr in device.ai_physical_chans:
            print(tr)
        print('List of Counter Output Channels:')
        for tr in device.co_physical_chans:
            print(tr)

    def iosync(self, samplerate=1000.0,max_out_range=10.0,max_in_range=10.0, \
               in_chan='Dev129/ai0', out_chan='Dev129/ao0'):
        """
        Simultaneous reading and writing though NI device.

        Parameters:
        samplerate: float
          The sampling rate
        max_out_range: float
          maximum voltage that the device can produce as output
        max_in_range: float
          maximum voltage that the device can take as input
        in_chan: str
          the analog input channel
        out_chan: str
          the analog output channel

        Returns:
        indata
        """
        # Create a Counter Task to be used for the trigger 
        # *(been having issues with correct channel name)*
        c_task = ni.Task()
        c_task.co_channels.add_co_pulse_chan_freq(counter = 'Dev129/ctr0', units=FrequencyUnits.HZ, initial_delay=0.0, freq=1000, duty_cycle=0.5)
        max_out_range = 10.0

        # Prevent outputs exceeding hardware limit
        outdata = np.asarray(self.outdata).T
        max_outdata = np.max(np.abs(outdata))
        if max_outdata > max_out_range:
            raise ValueError(
                f"outdata amplitude ({max_outdata:.2f}) larger than allowed range"
                f"(+-{max_out_range}).")

        # Create write and read tasks
        write_task = ni.Task()
        read_task = ni.Task()
        write_task.ao_channels.add_ao_voltage_chan(out_chan)
        read_task.ai_channels.add_ai_voltage_chan(in_chan)

        # Configure the clock used by the read and write task, use digital pulse train from counter task
        read_task.timing.cfg_samp_clk_timing(rate=1000.0, source = '/Dev129/Ctr0InternalOutput')
        write_task.timing.cfg_samp_clk_timing(rate=1000.0, source='/Dev129/Ctr0InternalOutput')

        # An attempt at using stream writers/readers, had issues so left it alone for now
        #a_writer = ni.stream_writers.AnalogSingleChannelWriter(write_task.out_stream)
        #a_writer.write_many_sample(outdata,timeout=10.0)
        #a_reader = ni.stream_readers.AnalogSingleChannelReader(read_task.in_stream)
        #a_reader.read_many_sample(self.indata,len(outdata),timeout=10.0)
        
        # Write and read made ready, intialized by the counter task starting
        # *or at least that's the idea, but read_task.read is currently timing out*
        write_task.write(outdata,timeout=5.0)
        print('length of outdata is', len(outdata), 'and length of indata is', len(self.indata))
        self.indata = np.asarray(read_task.read(number_of_samples_per_channel=len(outdata),timeout=5.0))
        print('Checkpoint 1')
        print('length of outdata is', len(outdata), 'and length of indata is', len(self.indata))
        write_task.start()
        read_task.start()
        c_task.start()

        if self.debug:
            print('Checkpoint 3')

        # Clean-up
        write_task.stop()
        write_task.close()
        read_task.stop()
        read_task.close()
        c_task.stop()
        c_task.close()

        if self.debug:
            print('Checkpoint 4')
        #if self.debug:
        #    print('indata = ', self.indata)

        self.final_indata = np.asarray(self.indata).T
        print('indata = ', self.indata)
        print('length of outdata is ', len(self.outdata), 'and length of indata is ', len(self.final_indata))

    # Plotting
    def plot_result(self,figsize=(8,6)):
        fig, ax = plt.subplots(2, 1, figsize=figsize)
        ax[0].plot(self.t, self.outdata)
        ax[0].set_title('Generated Sine Wave')
        ax[0].set_xlabel('Time (s)')
        ax[0].set_ylabel('Voltage (V)')
        ax[1].plot(self.t,self.final_indata.T)
        ax[1].set_title('Read Waveform')
        ax[1].set_xlabel('Time (s)')
        ax[1].set_ylabel('Voltage (V)')
        plt.tight_layout()
        plt.show()


#To run this:
sampling_rate = 1000.0
sin_freq = 100.0
duration = 1.0

job = NiJob(sampling_rate=sampling_rate,sin_freq=sin_freq,duration=duration,debug=True)
job.query_devices() #Will only work if debug=True
job.list_available_channels()
graph_indata=job.iosync()
job.plot_result()

So basically, I've tried to generate a counter task, give it a frequency of 1000 Hz, and use this as the source clock within the cfg_samp_clk_timing method for the read and write tasks. However, what's currently happening is that the read operation on line 108 is timing out:

self.indata = np.asarray(read_task.read(number_of_samples_per_channel=len(outdata),timeout=5.0))

Any ideas on why this is happening or ways to troubleshoot the cause are much appreciated. Also, should I be configuring the start trigger if I use a counter? Thanks again!

0 Kudos
Message 9 of 11
(1,645 Views)

Also, I've tried adding the following lines, but it's had no noticeable effect, the read operation still times out.

# Configure the trigger for the counter task
        c_task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source='100kHzTimebase', trigger_edge=Edge.RISING)
# Configure the trigger for the write task
        write_task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source='/Dev129/Ctr0InternalOutput', trigger_edge=Edge.RISING)

# Configure the trigger for the read task
        read_task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source='/Dev129/Ctr0InternalOutput', trigger_edge=Edge.RISING)
0 Kudos
Message 10 of 11
(1,632 Views)