04-22-2024 02:53 AM
Hi,
I'm working on a Python script utilizing the nidaqmx package to read analog channels from a NI 6356 card. So far, everything works smoothly; I can create tasks, define channels, set up a clock, and start the task to receive live voltage readings. However, when I attempt to set a trigger using the cfg_anlg_edge_ref_trig() function, it seems that nidaqmx holds the task until the entire measurement is completed. Consequently, I lose the live feed, which is not the desired behavior.
Below is a minimal working example of my script. Switching the trigger between True and False illustrates the issue (as you may notice, the comments are not up-to-date with this MWE, nevermind...):
import nidaqmx as daq
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
from nidaqmx.stream_readers import AnalogMultiChannelReader
def set_ai_measurement(number_samples, ai_channels, clock_frequency, trigger=False, trigger_channel='Dev1/ai0', pretrigger_samples=0, trigger_threshold=1):
""" Configures the counter for a finite measurement.
Args:
ai_voltage_range (float): expected max voltages value encountered during the measurement.
clock_frequency (int): rate of the measurement's clock.
pretrigger_samples (int, optional): number of samples recorded before the trigger.
trigger_threshold (int, optional): threshold for the detection of the trigger, in volts.
"""
current_time = datetime.now().strftime("%m%d%Y_%H%M%S")
task = daq.Task(new_task_name='task_{}'.format(current_time))
reader = AnalogMultiChannelReader(task.in_stream)
for chan in ai_channels:
task.ai_channels.add_ai_voltage_chan(
chan,
max_val=10,
min_val=-10)
if trigger:
if pretrigger_samples<0:
pretrigger_samples = 0
if trigger_threshold<0:
trigger_threshold = 1
if pretrigger_samples==0:
pretrigger_samples = 2 # Minimum value allowed
task.triggers.reference_trigger.cfg_anlg_edge_ref_trig(
trigger_channel,
pretrigger_samples,
trigger_slope=daq.constants.Slope.RISING,
trigger_level=trigger_threshold)
task.timing.cfg_samp_clk_timing(
clock_frequency,
sample_mode=daq.constants.AcquisitionType.FINITE,
samps_per_chan=number_samples)
task.start()
return task, reader
def read_daq(number_samples, number_ai_channels, reader):
""" Reads the current values of the channels.
Args:
number_samples (int): number of samples to read.
Returns:
np.array: array containing the datas.
"""
_stop_event = False
number_measurement = 0
raw_data = np.empty((number_ai_channels, number_samples))
while not _stop_event:
number_measurement+=1
current_time = datetime.now()
if number_measurement>=number_samples and number_samples>0:
break
channels_data = np.empty((number_ai_channels, 1))
channels_data.fill(np.nan)
try:
reader.read_many_sample(
channels_data,
number_of_samples_per_channel=1)
print(channels_data)
except:
pass
for i in range(number_ai_channels):
raw_data[i, 0] = channels_data[i]
raw_data = np.roll(raw_data, -1, axis=1)
plt.figure()
for i in range(number_ai_channels):
plt.plot(raw_data[i])
plt.show()
return 1
number_samples = 100
ai_channels = ['Dev1/ai0', 'Dev1/ai1']
clock_frequency = 20
trigger = True
trigger_channel = 'Dev1/ai0'
pretrigger_samples = 0
trigger_threshold = 4
task, reader = set_ai_measurement(number_samples, ai_channels, clock_frequency,
trigger, trigger_channel, pretrigger_samples, trigger_threshold)
read_daq(number_samples, len(ai_channels), reader)
Am I missing something in my implementation?
Currently, I'm considering implementing peak detection myself to trigger data recording instead of relying on the card's trigger.However, I'd prefer to leverage the capabilities of the expensive NI card if possible.
Any insights or suggestions would be greatly appreciated!