Counter/Timer

cancel
Showing results for 
Search instead for 
Did you mean: 

Synchronous sampling of analog voltage and digital encoder

My setup involves the cDAQ-9185 Chassis and NI-9401 and NI-9220 modules.
My intent is to have high frequency (2 kHz) sampling of analog voltage channels on NI-9220, and to have position encoder sampled using counter on NI-9401. 
I want to have synchronous sampling of analog voltages and the position encoder.

On the SW side, I am using the Python NIDAQmx package.

 

  • Is it possible to achieve true synchronous sampling of analog voltage and position encoder (counter) ? From what I have read, we have to use Implicit clock source for the counter whereas the analog values will be sampled using internal AI timing engine.
  • In the meanwhile, I tried standalone high frequency buffered sampling of position encoder. I understand that since counters don't have an internal clock by themselves, we have to use one of the other onboard clocks. So I chose to setup an "unused" digital input (DI) port and then use it's clock as a source for the counter. See code below.
    unused_digital_task.di_channels.add_di_chan("cDAQMod1/line7", line_grouping=nidaqmx.constants.LineGrouping.CHAN_PER_LINE)
    unused_digital_task.timing.cfg_samp_clk_timing(rate=self.config.load_cell_sample_rate,
                                                                  sample_mode=AcquisitionType.CONTINUOUS,)            
    
    angular_position_task.timing.cfg_samp_clk_timing(
                    rate=self.config.angular_position_sample_rate,
                    source="/cDAQMod1/di/SampleClock",
                    active_edge=nidaqmx.constants.Edge.RISING,
                    sample_mode=AcquisitionType.CONTINUOUS,
                    samps_per_chan = 2 * self.config.angular_position_batch_size(), # 2x oversized, as it determines the size of internal buffer on the DAQ card
                )​



    I then RESERVE the "unused_digital_task" and "angular_position_task". When I run it, I get the error 

    nidaqmx.errors.DaqError: Source terminal to be routed could not be found on the device.
    Make sure the terminal name is valid for the specified device. Refer to Measurement & Automation Explorer for valid terminal names.
    Property: DAQmx_SampClk_Src
    Property: DAQmx_SampClk_ActiveEdge
    Source Device: cDAQMod1                                                                                                                                                ol
    Source Terminal: di/SampleClock​

    I don't understand what's wrong in my specification of the Source Terminal.

0 Kudos
Message 1 of 6
(2,494 Views)

You're close.

 

1. No need for a dummy DI task, you can just have the encoder task get its sample clock from your AI task.

2. With cDAQ, the *chassis* is most often responsible for sample clock timing.  So instead of designating the *module*, you would designate the chassis.   For example, "/cDAQchassis/ai/SampleClock"

3. The *other* important step for sync when you share a sample clock this way is to make sure to start all tasks that "borrow" the clock first and then start the task that *causes* it last.   So here, start the encoder task first and the AI task last.

 

I only program LabVIEW and don't know the Python syntax.  It's possible there are other things for you to do, but the stuff above should at least move you closer.

 

 

-Kevin P

ALERT! LabVIEW's subscription-only policy came to an end (finally!). Unfortunately, pricing favors the captured and committed over new adopters -- so tread carefully.
0 Kudos
Message 2 of 6
(2,448 Views)

Reference: Synchronous Analog and Encoder Inputs

 

import pprint
import nidaqmx
from nidaqmx.constants import AcquisitionType, TaskMode

pp = pprint.PrettyPrinter(indent=4)

with nidaqmx.Task() as master_task, nidaqmx.Task() as slave_task:
    master_task.ai_channels.add_ai_voltage_chan("/9220/ai0:1")
    ci_task = slave_task.ci_channels.add_ci_ang_encoder_chan("/cDAQ/ctr0")
    ci_task.ci_encoder_a_input_term = "/9401/PFI0"
    ci_task.ci_encoder_b_input_term = "/9401/PFI1"
    ci_task.ci_encoder_z_input_term = "/9401/PFI2"

    master_task.timing.cfg_samp_clk_timing(
        2000, sample_mode=AcquisitionType.CONTINUOUS)
    slave_task.timing.cfg_samp_clk_timing(
        2000, source="/cDAQ/ai/SampleClock", sample_mode=AcquisitionType.CONTINUOUS)

    print('2 Channels 1 Sample Read Loop 10: ')
    slave_task.start()
    master_task.start()

    for _ in range(10):
        master_data = master_task.read(number_of_samples_per_channel=10)
        slave_data = slave_task.read(number_of_samples_per_channel=10)

        print('Master Task Data: ')
        pp.pprint(master_data)
        print('Slave Task Data: ')
        pp.pprint(slave_data)

 

-------------------------------------------------------
Applications Engineer | TME Systems
https://tmesystems.net/
Message 3 of 6
(2,441 Views)

Thanks everyone for your responses. I think I am pretty close to getting it working.
Now I am stuck on an non-descriptive error that's thrown when I start the encoder task.

 

  File "/home/Documents/encoderai.py", line 88, in <module>
    angular_position_task.start()                                                                                                                                               3622d6
  File "/home/Documents/venvs/wdtvenv_test/lib/python3.10/site-packages/nidaqmx/task.py", line 894, in start
    self._interpreter.start_task(self._handle)                                                                                                                                     
  File "/home/Documents/venvs/wdtvenv_test/lib/python3.10/site-packages/nidaqmx/_library_interpreter.py", line 5403, in start_task
    self.check_for_error(error_code)                                                                                                                                            3622d6
  File "/home/Documents/venvs/wdtvenv_test/lib/python3.10/site-packages/nidaqmx/_library_interpreter.py", line 6029, in check_for_error
    raise DaqError(extended_error_info, error_code)
nidaqmx.errors.DaqError: An internal error occurred.                                                                                                                               
Task Name: _unnamedTask<1>

 


Here is the minimal code to recreate it

 

import nidaqmx
import numpy as np
from nidaqmx.constants import (  # type: ignore
    AcquisitionType,
)
from nidaqmx.stream_readers import (  # type: ignore
    AnalogMultiChannelReader,
    CounterReader
)
import time
import logging
from typing import List, Optional
from dataclasses import field

logger = logging.getLogger(__name__)


# Raw sample rate of the load cell pins, in Hz.
load_cell_sample_rate: float = 2000.0

# Encoder Pulse Per Revoltuion (PPR)
angular_encoder_ppr: int = 2048

# Encoder Sample Rate [Hz]
angular_position_sample_rate: float = 2000.0

    
# Allocate data buffers
angular_position_data = np.empty((200), dtype='double')
load_cell_data = np.empty((6, 200), dtype='float')

with nidaqmx.Task() as load_cell_task, nidaqmx.Task() as angular_position_task:

    load_cell_task.ai_channels.add_ai_voltage_chan('cDAQMod2/ai0')
    load_cell_task.ai_channels.add_ai_voltage_chan('cDAQMod2/ai1')
    load_cell_task.ai_channels.add_ai_voltage_chan('cDAQMod2/ai2')
    load_cell_task.ai_channels.add_ai_voltage_chan('cDAQMod2/ai3')
    load_cell_task.ai_channels.add_ai_voltage_chan('cDAQMod2/ai4')
    load_cell_task.ai_channels.add_ai_voltage_chan('cDAQMod2/ai5')

    load_cell_task.timing.cfg_samp_clk_timing(
        rate=load_cell_sample_rate,
        sample_mode=AcquisitionType.CONTINUOUS,
        samps_per_chan = 400, # 2x oversized, as it determines the size of internal buffer on the DAQ card
    )
    load_cell_task_reader = AnalogMultiChannelReader(load_cell_task.in_stream)

    def load_cell_callback(*_):
        load_cell_task_reader.read_many_sample(load_cell_data,
                                                number_of_samples_per_channel = 200,
                                                timeout = 1)
        print ("Load Cell call back at time = ", time.time())
        # print (load_cell_data)

    load_cell_task.register_every_n_samples_acquired_into_buffer_event(200,
                                                                       load_cell_callback)


    angular_position_task.ci_channels.add_ci_ang_encoder_chan("cDAQMod1/ctr0", 'position_channel',
                            decoding_type = nidaqmx.constants.EncoderType.X_4,
                            zidx_enable = True,
                            zidx_val = 0,
                            zidx_phase = nidaqmx.constants.EncoderZIndexPhase.ALOW_BLOW,
                            units = nidaqmx.constants.AngleUnits.DEGREES,
                            pulses_per_rev = 2048,
                            initial_angle = 0.0)
    
    angular_position_task.timing.cfg_samp_clk_timing(
        rate=angular_position_sample_rate,
        source="/cDAQ/ai/SampleClock",
        sample_mode=AcquisitionType.CONTINUOUS,
        samps_per_chan = 400, # 2x oversized, as it determines the size of internal buffer on the DAQ card
    )
    angular_position_task_reader = CounterReader(angular_position_task.in_stream)

    def angular_position_callback(*_):
        angular_position_task_reader.read_many_sample_double(angular_position_data,
                                                        number_of_samples_per_channel = 200,
                                                        timeout = 1)
        print ("Angular Pos Call back at time = ", time.time())
        print (angular_position_data)

    angular_position_task.register_every_n_samples_acquired_into_buffer_event(200,
                                                                              angular_position_callback)

    
    load_cell_task.start()
    angular_position_task.start()
                                                                
    while True:
        time.sleep(0.5)

        


 


I worked through the call stack of error and think it doesn't like the angular_position_task config. After several trials, I was able to conclude that it basically doesn't like the code sequence around angular_position_task's reader "CounterReader" and subsequent angular_position_callback routine.

I tried not having a callback and instead just a simple periodic read in "while True" loop, and that works fine.

 

Any ideas what might be the issue ?

 

0 Kudos
Message 4 of 6
(2,408 Views)

I don't have actual hardware but I don't have issues running your code using simulated devices.

I am using NI-DAQmx 2023 Q1 and Python 3.9.11

 

Side note: you can use NI-DAQmx Syntax for Specifying Physical Channel Strings to add multiple channels.

load_cell_task.ai_channels.add_ai_voltage_chan('cDAQMod2/ai0:5')
-------------------------------------------------------
Applications Engineer | TME Systems
https://tmesystems.net/
0 Kudos
Message 5 of 6
(2,395 Views)

Thanks for checking it out on the simulator. 
My python version is 3.10.12 and I am using drivers from Q2/2023 release.

Still haven't found a solution really, but here is a workaround that I have.

 

Essentially, I removed the encoder call back task and instead have a single call back that pops out buffer data from both analog channels as well as the encoder. This seems to work fine.

 

It looks like callback based data extraction is not supported from encoder modules (or is that 9401 specifically) : Can someone confirm this ?

0 Kudos
Message 6 of 6
(2,379 Views)