05-08-2023 06:18 PM
This is likely a simple task, but I am new to the NI-DAQmx Python API, and I need some help figuring out how to read into an array after every write operation, performed at a fixed rate. What needs to happen is that, from an array of data, the program writes a value to an analog output channel, then reads a value from an analog input channel, and continues this until the entire output array has been written (with corresponding input array values). I've tried various things like changing the acquisition type, changing the buffer size, but I keep getting errors that prevent the script from running. Here is the relevant code, which is my most recent (and likely buggy) version:
Any and all help would be greatly appreciated.
05-09-2023 03:51 AM
What is your application? why do you need to read after every write?
BTW, I cannot edit the image and so does anyone else, please attach your python code.
05-09-2023 01:26 PM
I am writing DC voltages to an analog input channel, and reading back DC voltages from an analog input channel. The output data is a sine wave for an integer multiple of periods, and after every write I should be reading back and storing a voltage, and this process should be happening at a fixed frequency. Here is the paste of my code:
import os
import sys
import PID
import nidaqmx
import os.path
import numpy as np
from multiprocessing import Process
import matplotlib.animation as animation
from nidaqmx._task_modules.channels.ao_channel import AOChannel
from nidaqmx.constants import TerminalConfiguration, Edge, AcquisitionType
import nidaqmx.constants
from nidaqmx.stream_readers import AnalogSingleChannelReader
from nidaqmx.stream_writers import AnalogSingleChannelWriter
import nidaqmx.stream_writers
import math
system = nidaqmx.system.System.local()
devicename = system.devices[0].name
intask = nidaqmx.Task() # read task
outtask = nidaqmx.Task() # write task
InputSampleRate = 10000
OutputSampleRate = 10000
# Configure the output buffer, use 4 bytes for float numbers
buff_size = 4 * 1000
outtask.ao_channels.add_ao_voltage_chan('Dev130/ao0', min_val=-10, max_val=10, units=nidaqmx.constants.VoltageUnits.VOLTS)
outtask.timing.cfg_samp_clk_timing(rate=OutputSampleRate, active_edge=nidaqmx.constants.Edge.RISING,
sample_mode=nidaqmx.constants.AcquisitionType.CONTINUOUS, samps_per_chan=1000)
outtask.triggers.start_trigger.cfg_anlg_edge_start_trig(trigger_source='APFI0')
#outtask.out_stream.buffer_size = buff_size
intask.ai_channels.add_ai_voltage_chan('Dev130/ai0', terminal_config=nidaqmx.constants.TerminalConfiguration.RSE,
min_val=-10, max_val=10, units=nidaqmx.constants.VoltageUnits.VOLTS)
intask.timing.cfg_samp_clk_timing(rate=InputSampleRate, active_edge=nidaqmx.constants.Edge.RISING,
sample_mode=nidaqmx.constants.AcquisitionType.CONTINUOUS, samps_per_chan=1000)
# Set the sampling rate, frequency, and duration of the waveform
sampling_rate = 10000.0
frequency = 1000.0
duration = 1.0
# Generate the sine wave data
t = np.arange(0, duration, 1.0 / sampling_rate)
outdata = 3 * np.sin(2 * np.pi * frequency * t)
indata = np.zeros(len(outdata))
outtask.start()
indate = intask.read()
print('indata =', indata)
# Plot the generated and read waveforms
import matplotlib.pyplot as plt
fig, ax = plt.subplots(2, 1, figsize=(8, 6))
ax[0].plot(t, outdata)
ax[0].set_title('Generated Sine Wave')
ax[0].set_xlabel('Time (s)')
ax[0].set_ylabel('Voltage (V)')
ax[1].plot(t, indata)
ax[1].set_title('Read Waveform')
ax[1].set_xlabel('Time (s)')
ax[1].set_ylabel('Voltage (V)')
plt.tight_layout()
plt.show()
intask.close()
outtask.close()
05-09-2023 03:27 PM
I've been trying an updated approach to this, which is still not working as intended, but might be better in the long run:
import numpy as np
import sympy as sp
import nidaqmx as ni
import matplotlib.pyplot as plt
from nidaqmx.constants import TaskMode, Slope, Edge
import threading
def query_devices():
local_system = ni.system.System.local()
driver_version = local_system.driver_version
print('DAQmx {0}.{1}.{2}'.format(driver_version.major_version, driver_version.minor_version,
driver_version.update_version))
for device in local_system.devices:
print('Device Name: {0}, Product Category: {1}, Product Type: {2}'.format(
device.name, device.product_category, device.product_type))
def read_data(read_task, indata, nsamples):
read_task.start()
while True:
if len(indata) >= nsamples:
break
indata.extend(read_task.read(nsamples - len(indata), timeout=10))
read_task.stop()
read_task.close()
def iosync(data, samplerate=10000, input_mapping=['Dev129/ai0'], output_mapping=['Dev129/ao0']):
"""
Simultaneous reading and writing though NI device.
Parameters:
data: array_like, shape (nsamples, len(output_mapping))
Data to be send to output channels.
samplerate: int
The sampling rate
input_mapping: list of str
Input device channels
output_mapping: list of str
Output device channels
Returns:
indata, shape (nsamples, len(input_mapping))
Data from input channels
"""
outdata = np.asarray(data).T
nsamples = 1000
max_out_range = 5.0
max_in_range = 10
max_outdata = np.max(np.abs(outdata))
if max_outdata > max_out_range:
raise ValueError(
f"outdata amplitude ({max_outdata:.2f}) larger than allowed range"
f"(+-{max_out_range}).")
with ni.Task() as read_task, ni.Task() as write_task:
for o in output_mapping:
aochan = write_task.ao_channels.add_ao_voltage_chan(o)
aochan.ao_max = max_out_range
aochan.ao_min = -max_out_range
for i in input_mapping:
aichan = read_task.ai_channels.add_ai_voltage_chan(i)
aichan.ai_min = -max_in_range
aichan.ai_max = max_in_range
for task in (read_task, write_task):
task.timing.cfg_samp_clk_timing(rate=samplerate, source='OnboardClock',
samps_per_chan=nsamples)
write_task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source='/Dev129/PFI0',trigger_edge=Edge.RISING)
read_task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source='/Dev129/PFI0',trigger_edge=Edge.RISING)
#read_task.triggers.start_trigger.cfg_anlg_edge_start_trig(write_task.triggers.start_trigger.term, trigger_slope=Slope.RISING, trigger_level=0.1)
write_task.write(outdata, auto_start=False)
# Create a list to store the read data
indata = []
# Start a separate thread to continuously read data
read_thread = threading.Thread(target=read_data, args=(read_task, indata, nsamples))
read_thread.start()
# Start both tasks
write_task.start()
# Write the output data
write_task.write(outdata, auto_start=True)
# Wait for the read thread to complete
read_thread.join()
print('indata = ',indata)
return np.asarray(indata).T
query_devices()
# Set the sampling rate and duration of the waveform
sampling_rate = 10000.0
frequency = 100.0
duration = 1.0
# Generate the sine wave data
t = np.arange(0,duration,1/sampling_rate)
outdata = 3 * np.sin(2 * np.pi*frequency*t)
print('outdata = ', outdata)
print('sum of outdata = ', sum(outdata))
# Acquire the Data
indata = iosync(data = outdata)
fig, ax = plt.subplots(2, 1, figsize=(8, 6))
ax[0].plot(t, outdata)
ax[0].set_title('Generated Sine Wave')
ax[0].set_xlabel('Time (s)')
ax[0].set_ylabel('Voltage (V)')
ax[1].plot(t, indata)
ax[1].set_title('Read Waveform')
ax[1].set_xlabel('Time (s)')
ax[1].set_ylabel('Voltage (V)')
plt.tight_layout()
plt.show()
05-15-2023 09:30 PM
Sorry, I'm no help at all with the Python aspects of this.
But it *sounds* like you have some kind of stimulus-response app going on that probably calls for your AO and AI tasks to be sync'ed with a hardware clock.
My preference would be to generate that clock with a counter output task, so I can control the time between leading and trailing edge. Then I'd make the AO task generate samples on the leading edge while AI captures samples on the trailing edge.
Hopefully someone else more familiar with the Python API can help fill in the blanks here.
-Kevin P
05-23-2023 05:18 PM - edited 05-23-2023 05:20 PM
I've been doing some debugging, and I think I've pinpointed a part of the problem. The read_task is starting, but it may not be triggering. Is there some method that I can use to let me know what the status of the trigger is? I'm aware of 'is_task_done' but in my understanding this only indicates if the task has started/completed, not if it has triggered.
05-23-2023 05:31 PM - edited 05-23-2023 05:35 PM
Here is my most current code:
import numpy as np
import sympy as sp
import nidaqmx as ni
import matplotlib.pyplot as plt
from nidaqmx.constants import TaskMode, Slope, Edge
import threading
def query_devices():
local_system = ni.system.System.local()
driver_version = local_system.driver_version
print('DAQmx {0}.{1}.{2}'.format(driver_version.major_version, driver_version.minor_version,
driver_version.update_version))
for device in local_system.devices:
print('Device Name: {0}, Product Category: {1}, Product Type: {2}'.format(
device.name, device.product_category, device.product_type))
def read_data(read_task, indata, nsamples):
read_task.start()
print('nsamples = ', nsamples,'and len(indata) = ', len(indata))
print('read_data check 1')
while True:
if len(indata) >= nsamples:
print('Now True')
break
print('Still False')
print('indata loop =', indata)
# SOMETHING IS HAPPENING HERE THAT PREVENTS THE LOOP FROM COMPLETING, AND IT TIMES OUT. Perhas the task hasn't triggered?
# Check if the task has triggered
if not read_task.is_task_done():
print("Task has started.")
else:
print("Task has not started yet or has completed.")
indata.extend(read_task.read(timeout=10))
print('read_data check 2')
# Debugging
print('indata = ', indata)
read_task.stop()
read_task.close()
def iosync(data, samplerate=10000, input_mapping=['Dev129/ai0'], output_mapping=['Dev129/ao0']):
"""
Simultaneous reading and writing though NI device.
Parameters:
data: array_like, shape (nsamples, len(output_mapping))
Data to be send to output channels.
samplerate: int
The sampling rate
input_mapping: list of str
Input device channels
output_mapping: list of str
Output device channels
Returns:
indata, shape (nsamples, len(input_mapping))
Data from input channels
"""
outdata = np.asarray(data).T
nsamples = 1000
max_out_range = 5.0
max_in_range = 10
max_outdata = np.max(np.abs(outdata))
if max_outdata > max_out_range:
raise ValueError(
f"outdata amplitude ({max_outdata:.2f}) larger than allowed range"
f"(+-{max_out_range}).")
# Create write and read tasks, voltage ranges are specific to the NI device in use
with ni.Task() as read_task, ni.Task() as write_task:
for o in output_mapping:
aochan = write_task.ao_channels.add_ao_voltage_chan(o)
aochan.ao_max = max_out_range
aochan.ao_min = -max_out_range
for i in input_mapping:
aichan = read_task.ai_channels.add_ai_voltage_chan(i)
aichan.ai_min = -max_in_range
aichan.ai_max = max_in_range
# Specify task timing parameters
for task in (read_task, write_task):
task.timing.cfg_samp_clk_timing(rate=samplerate, source='OnboardClock',
samps_per_chan=nsamples)
# Set both tasks to trigger on the rising edge of the internal clock, thereby synchronizing them
write_task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source='/Dev129/PFI0',trigger_edge=Edge.RISING)
read_task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source='/Dev129/PFI0',trigger_edge=Edge.FALLING)
#read_task.triggers.start_trigger.cfg_anlg_edge_start_trig(write_task.triggers.start_trigger.term, trigger_slope=Slope.RISING, trigger_level=0.1)
write_task.write(outdata, auto_start=False)
# Create a list to store the read data
indata = []
# Start a separate thread to continuously read data
read_thread = threading.Thread(target=read_data, args=(read_task, indata, nsamples))
print('Checkpoint 1')
read_thread.start()
print('Checkpoint 2')
# Start both tasks
write_task.start()
print('Checkpoint 3')
# Write the output data
write_task.write(outdata, auto_start=True)
print('Checkpoint 4')
# Wait for the read thread to complete
read_thread.join()
print('Checkpoint 5')
print('indata = ',indata)
return np.asarray(indata).T
query_devices()
# Set the sampling rate and duration of the waveform
sampling_rate = 1000.0
frequency = 100.0
duration = 1.0
# Generate the sine wave data
t = np.arange(0,duration,1/sampling_rate)
outdata = 3 * np.sin(2 * np.pi*frequency*t)
print('outdata = ', outdata)
print('sum of outdata = ', sum(outdata))
# Acquire the data
indata = iosync(data = outdata)
fig, ax = plt.subplots(2, 1, figsize=(8, 6))
ax[0].plot(t, outdata)
ax[0].set_title('Generated Sine Wave')
ax[0].set_xlabel('Time (s)')
ax[0].set_ylabel('Voltage (V)')
ax[1].plot(t, indata)
ax[1].set_title('Read Waveform')
ax[1].set_xlabel('Time (s)')
ax[1].set_ylabel('Voltage (V)')
plt.tight_layout()
plt.show()
When I run this, Checkpoints 1 - 4 are printed, as is 'read_data check 1,' 'Still False,' 'indata loop = [],' and 'Task has started,' however it is at this point that the program hangs until timeout, without ever printing 'read_data check 2.' The problem appears to be occurring at the line with indata.extend(read_task.read(timeout=10)), which I suspect is being caused by the read() not triggering. At this point, I get 'Exception in thread Thread-1 (read_data):' followed by a bunch of errors, the most useful of which is the segment:
nidaqmx.errors.DaqReadError: Some or all of the samples requested have not yet been acquired.
To wait for the samples to become available use a longer read timeout or read later in your program. To make the samples available sooner, increase the sample rate. If your task uses a start trigger, make sure that your start trigger is configured correctly. It is also possible that you configured the task for external timing, and no clock was supplied. If this is the case, supply an external clock.
Property: DAQmx_Read_RelativeTo
Corresponding Value: DAQmx_Val_CurrReadPos
Property: DAQmx_Read_Offset
Corresponding Value: 0
Task Name: _unnamedTask<0>
Status Code: -200284
Checkpoint 5
indata = []
05-24-2023 07:32 AM
Again, I can't give any detailed help on the Python syntax, but here are some general ideas and tips.
1. You probably don't need to (and therefore shouldn't) configure the tasks to be triggered. It will be sufficient to have the tasks share a *sample clock*, provided the task that sources it is started last.
2. The first and relatively easy way to do it is to set up one of your tasks, let's say AO, to continue to configure its timing to use "OnboardClock" for sampling. But then the AI task should configure its timing to use something like "/Dev1/ao/SampleClock" for sampling. Then the AI task should be started *before* the AO task. (The internal "...ao/SampleClock" signal doesn't start pulsing until you start the AO task and it stops when the AO task stops.)
3. I usually prefer a method where I use a counter task to generate a pulse train that *both* AI and AO use as their sample clock. This makes it easier to control the timing between the AO stimulus and the AI response measurement if you configure the AO task to use the pulse's *leading* edge (probably rising?) and the AI task to use the same pulse's *trailing* edge (probably falling?) for sampling.
Like before, it'll then be important to start the counter task last. The syntax for the sample clock source would look similar to "/Dev1/Ctr0InternalOutput".
4. When you use a hardware clock and an output buffer for AO tasks, you should write data to the task *before* sampling starts (so typically before you call DAQmx Start). In your code below, it appears that you start before you write.
-Kevin P
05-30-2023 07:09 PM
Many thanks for your help Kevin, I've been attempting to implement your suggestions, and here is what I have thus far:
import numpy as np
import sympy as sp
import nidaqmx as ni
import matplotlib.pyplot as plt
from nidaqmx.constants import TaskMode, Slope, Edge, TriggerType, FrequencyUnits
import threading
import time
from nidaqmx.stream_readers import AnalogSingleChannelReader
from nidaqmx.stream_writers import AnalogSingleChannelWriter
class NiJob:
def __init__(self,sampling_rate,sin_freq,duration,debug=False):
self.sampling_rate=sampling_rate
self.sin_freq=sin_freq
self.duration=duration
# Produce the data to be written by the DAQ to output
self.t=np.arange(0, self.duration, 1 / self.sampling_rate)
self.outdata = 0.5 * np.sin(2 * np.pi * self.sin_freq * self.t)
self.debug=debug
# Ensure that the array used to store data is the same size as the output array
self.indata=np.zeros(len(self.outdata))
self.final_indata=None
print('At initialization, length of outdata is', len(self.outdata), 'and length of indata is', len(self.indata))
# Produces a list of all NI devices connected to the system and their IDs
def query_devices(self):
if self.debug:
local_system = ni.system.System.local()
driver_version = local_system.driver_version
print('DAQmx {0}.{1}.{2}'.format(driver_version.major_version, driver_version.minor_version,
driver_version.update_version))
for device in local_system.devices:
print('Device Name: {0}, Product Category: {1}, Product Type: {2}'.format(
device.name, device.product_category, device.product_type))
else:
raise Exception('Enable debug to run query_devices')
# Produce lists of the specified channel/terminal types on the device in use
def list_available_channels(self):
device = ni.system.device.Device("Dev129")
print('List of Device Terminals:')
for tr in device.terminals:
print(tr)
print('List of Input Physical Channels:')
for tr in device.ai_physical_chans:
print(tr)
print('List of Counter Output Channels:')
for tr in device.co_physical_chans:
print(tr)
def iosync(self, samplerate=1000.0,max_out_range=10.0,max_in_range=10.0, \
in_chan='Dev129/ai0', out_chan='Dev129/ao0'):
"""
Simultaneous reading and writing though NI device.
Parameters:
samplerate: float
The sampling rate
max_out_range: float
maximum voltage that the device can produce as output
max_in_range: float
maximum voltage that the device can take as input
in_chan: str
the analog input channel
out_chan: str
the analog output channel
Returns:
indata
"""
# Create a Counter Task to be used for the trigger
# *(been having issues with correct channel name)*
c_task = ni.Task()
c_task.co_channels.add_co_pulse_chan_freq(counter = 'Dev129/ctr0', units=FrequencyUnits.HZ, initial_delay=0.0, freq=1000, duty_cycle=0.5)
max_out_range = 10.0
# Prevent outputs exceeding hardware limit
outdata = np.asarray(self.outdata).T
max_outdata = np.max(np.abs(outdata))
if max_outdata > max_out_range:
raise ValueError(
f"outdata amplitude ({max_outdata:.2f}) larger than allowed range"
f"(+-{max_out_range}).")
# Create write and read tasks
write_task = ni.Task()
read_task = ni.Task()
write_task.ao_channels.add_ao_voltage_chan(out_chan)
read_task.ai_channels.add_ai_voltage_chan(in_chan)
# Configure the clock used by the read and write task, use digital pulse train from counter task
read_task.timing.cfg_samp_clk_timing(rate=1000.0, source = '/Dev129/Ctr0InternalOutput')
write_task.timing.cfg_samp_clk_timing(rate=1000.0, source='/Dev129/Ctr0InternalOutput')
# An attempt at using stream writers/readers, had issues so left it alone for now
#a_writer = ni.stream_writers.AnalogSingleChannelWriter(write_task.out_stream)
#a_writer.write_many_sample(outdata,timeout=10.0)
#a_reader = ni.stream_readers.AnalogSingleChannelReader(read_task.in_stream)
#a_reader.read_many_sample(self.indata,len(outdata),timeout=10.0)
# Write and read made ready, intialized by the counter task starting
# *or at least that's the idea, but read_task.read is currently timing out*
write_task.write(outdata,timeout=5.0)
print('length of outdata is', len(outdata), 'and length of indata is', len(self.indata))
self.indata = np.asarray(read_task.read(number_of_samples_per_channel=len(outdata),timeout=5.0))
print('Checkpoint 1')
print('length of outdata is', len(outdata), 'and length of indata is', len(self.indata))
write_task.start()
read_task.start()
c_task.start()
if self.debug:
print('Checkpoint 3')
# Clean-up
write_task.stop()
write_task.close()
read_task.stop()
read_task.close()
c_task.stop()
c_task.close()
if self.debug:
print('Checkpoint 4')
#if self.debug:
# print('indata = ', self.indata)
self.final_indata = np.asarray(self.indata).T
print('indata = ', self.indata)
print('length of outdata is ', len(self.outdata), 'and length of indata is ', len(self.final_indata))
# Plotting
def plot_result(self,figsize=(8,6)):
fig, ax = plt.subplots(2, 1, figsize=figsize)
ax[0].plot(self.t, self.outdata)
ax[0].set_title('Generated Sine Wave')
ax[0].set_xlabel('Time (s)')
ax[0].set_ylabel('Voltage (V)')
ax[1].plot(self.t,self.final_indata.T)
ax[1].set_title('Read Waveform')
ax[1].set_xlabel('Time (s)')
ax[1].set_ylabel('Voltage (V)')
plt.tight_layout()
plt.show()
#To run this:
sampling_rate = 1000.0
sin_freq = 100.0
duration = 1.0
job = NiJob(sampling_rate=sampling_rate,sin_freq=sin_freq,duration=duration,debug=True)
job.query_devices() #Will only work if debug=True
job.list_available_channels()
graph_indata=job.iosync()
job.plot_result()
So basically, I've tried to generate a counter task, give it a frequency of 1000 Hz, and use this as the source clock within the cfg_samp_clk_timing method for the read and write tasks. However, what's currently happening is that the read operation on line 108 is timing out:
self.indata = np.asarray(read_task.read(number_of_samples_per_channel=len(outdata),timeout=5.0))
Any ideas on why this is happening or ways to troubleshoot the cause are much appreciated. Also, should I be configuring the start trigger if I use a counter? Thanks again!
05-31-2023 03:53 PM
Also, I've tried adding the following lines, but it's had no noticeable effect, the read operation still times out.
# Configure the trigger for the counter task
c_task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source='100kHzTimebase', trigger_edge=Edge.RISING)
# Configure the trigger for the write task
write_task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source='/Dev129/Ctr0InternalOutput', trigger_edge=Edge.RISING)
# Configure the trigger for the read task
read_task.triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source='/Dev129/Ctr0InternalOutput', trigger_edge=Edge.RISING)