Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

ppk2_test = PPK2_API(ppk2_port)
ppk2_test.get_modifiers()
ppk2_test.use_ampere_meter() # set ampere meter mode
ppk2_test.toggle_DUT_power("OFF") # disable DUT power
ppk2_test.set_source_voltage(3300)

ppk2_test.use_source_meter() # set source meter mode
ppk2_test.toggle_DUT_power("ON") # enable DUT power

ppk2_test.start_measuring() # start measuring
# measurements are a constant stream of bytes
Expand All @@ -34,14 +36,16 @@
print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA")
time.sleep(0.01)

ppk2_test.toggle_DUT_power("ON")
ppk2_test.toggle_DUT_power("OFF") # disable DUT power

ppk2_test.use_ampere_meter() # set ampere meter mode

ppk2_test.start_measuring()
for i in range(0, 1000):
read_data = ppk2_test.get_data()
if read_data != b'':
samples = ppk2_test.get_samples(read_data)
print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA")
time.sleep(0.001) # lower time between sampling -> less samples read in one sampling period
time.sleep(0.01) # lower time between sampling -> less samples read in one sampling period

ppk2_test.stop_measuring()
36 changes: 19 additions & 17 deletions example_mp.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,51 @@

"""
Basic usage of PPK2 Python API - multiprocessing version
Basic usage of PPK2 Python API - multiprocessing version.
The basic ampere mode sequence is:
1. read modifiers
2. set ampere mode
3. read stream of data
"""
import time
from ppk2_api.ppk2_api import PPK2_MP
from ppk2_api.ppk2_api import PPK2_MP as PPK2_API

ppk2s_connected = PPK2_MP.list_devices()
ppk2s_connected = PPK2_API.list_devices()
if(len(ppk2s_connected) == 1):
ppk2_port = ppk2s_connected[0]
print(f'Found PPK2 at {ppk2_port}')
else:
print(f'Too many connected PPK2\'s: {ppk2s_connected}')
exit()

ppk2_test = PPK2_MP(ppk2_port)
ppk2_test = PPK2_API(ppk2_port, buffer_max_size_seconds=1, buffer_chunk_seconds=0.01)
ppk2_test.get_modifiers()
ppk2_test.use_ampere_meter() # set ampere meter mode
ppk2_test.toggle_DUT_power("OFF") # disable DUT power
ppk2_test.set_source_voltage(3300)

ppk2_test.start_measuring() # start measuring
ppk2_test.use_source_meter() # set source meter mode
ppk2_test.toggle_DUT_power("ON") # enable DUT power

ppk2_test.start_measuring() # start measuring
# measurements are a constant stream of bytes
# multiprocessing variant starts a process in the background which constantly
# polls the device in order to prevent losing samples. It will buffer the
# last 10s (by default) of data so get_data() can be called less frequently.
for i in range(0, 10):
# the number of measurements in one sampling period depends on the wait between serial reads
# it appears the maximum number of bytes received is 1024
# the sampling rate of the PPK2 is 100 samples per millisecond
for i in range(0, 1000):
read_data = ppk2_test.get_data()
if read_data != b'':
samples = ppk2_test.get_samples(read_data)
print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA")
time.sleep(0.5)
time.sleep(0.001)

ppk2_test.toggle_DUT_power("ON")
ppk2_test.toggle_DUT_power("OFF") # disable DUT power

ppk2_test.use_ampere_meter() # set ampere meter mode

ppk2_test.start_measuring()
for i in range(0, 10):
for i in range(0, 1000):
read_data = ppk2_test.get_data()
if read_data != b'':
samples = ppk2_test.get_samples(read_data)
print(f"Average of {len(samples)} samples is: {sum(samples)/len(samples)}uA")
time.sleep(0.5) # lower time between sampling -> less samples read in one sampling period

ppk2_test.stop_measuring()
time.sleep(0.001) # lower time between sampling -> less samples read in one sampling period

ppk2_test.stop_measuring()
15 changes: 9 additions & 6 deletions src/ppk2_api/ppk2_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ def run(self):
self._buffer_q.get()
local_buffer = local_buffer[self._buffer_chunk:]
self._last_timestamp = tm_now
#print(len(d), len(local_buffer), self._buffer_q.qsize())
# print(len(d), len(local_buffer), self._buffer_q.qsize())

# calculate stats
s += len(d)
Expand All @@ -423,7 +423,7 @@ def get_data(self):
count = 0
while True:
try:
ret += self._buffer_q.get(timeout=0.01) # get_nowait sometimes skips a chunk for some reason
ret += self._buffer_q.get(timeout=0.001) # get_nowait sometimes skips a chunk for some reason
count += 1
except queue.Empty:
break
Expand All @@ -435,15 +435,18 @@ class PPK2_MP(PPK2_API):
Multiprocessing variant of the object. The interface is the same as for the regular one except it spawns
a background process on start_measuring()
'''
def __init__(self, port, buffer_seconds=10):
def __init__(self, port, buffer_max_size_seconds=10, buffer_chunk_seconds=0.1):
'''
port - port where PPK2 is connected
buffer_seconds - how many seconds of data to keep in the buffer
buffer_max_size_seconds - how many seconds of data to keep in the buffer
buffer_chunk_seconds - how many seconds of data to put in the queue at once
'''
super().__init__(port)

self._fetcher = None
self._quit_evt = multiprocessing.Event()
self._buffer_seconds = buffer_seconds
self._buffer_max_size_seconds = buffer_max_size_seconds
self._buffer_chunk_seconds = buffer_chunk_seconds

def __del__(self):
"""Destructor"""
Expand All @@ -467,7 +470,7 @@ def start_measuring(self):
if self._fetcher is not None:
return

self._fetcher = PPK_Fetch(self, self._quit_evt, self._buffer_seconds)
self._fetcher = PPK_Fetch(self, self._quit_evt, self._buffer_max_size_seconds, self._buffer_chunk_seconds)
self._fetcher.start()

def stop_measuring(self):
Expand Down