Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lib and grpc time data types and conversions to enable future NI-DAQmx time support #449

Merged
merged 17 commits into from
Aug 31, 2023
1 change: 1 addition & 0 deletions generated/nidaqmx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from nidaqmx.grpc_session_options import *
from nidaqmx.scale import Scale
from nidaqmx.task import Task
from nidaqmx.time import AbsoluteTime
from nidaqmx.types import CtrFreq, CtrTick, CtrTime

try:
Expand Down
68 changes: 68 additions & 0 deletions generated/nidaqmx/time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import ctypes
import functools
from datetime import datetime, timezone

@functools.total_ordering
class AbsoluteTime(ctypes.Structure):
# Please visit ni.com/info and enter the Info Code NI_BTF for detailed information.
# The summary is:
# * lsb - positive fractions (2^-64) of a second
# * msb - number of whole seconds since 12am, Friday, January 1, 1904, UTC

_pack_ = 4
_fields_ = [("lsb", ctypes.c_uint64), ("msb", ctypes.c_int64)]

BIAS_FROM_1970_EPOCH = 2082844800
NUM_SUBSECONDS = 2 ** 64
NUM_MICROSECONDS = 1000000

@classmethod
def from_datetime(cls, dt):
utc_dt = dt.astimezone(tz=timezone.utc)

# First, calculate whole seconds by converting from the 1970 to 1904 epoch.
timestamp_1970_epoch = utc_dt.timestamp()
was_negative = timestamp_1970_epoch < 0
timestamp_1904_epoch = int(timestamp_1970_epoch + AbsoluteTime.BIAS_FROM_1970_EPOCH)

# Our bias is positive, so our sign should only change if we were previously negative.
is_negative = timestamp_1904_epoch < 0
if is_negative != was_negative and not was_negative:
raise OverflowError(f"Can't represent {dt.isoformat()} in AbsoluteTime (1904 epoch)")

# Finally, convert the microseconds to subseconds.
lsb = int(round(AbsoluteTime.NUM_SUBSECONDS * utc_dt.microsecond / AbsoluteTime.NUM_MICROSECONDS))

return AbsoluteTime(lsb=lsb, msb=timestamp_1904_epoch)

def to_datetime(self, tzinfo=timezone.utc):
# First, calculate whole seconds by converting from the 1904 to 1970 epoch.
timestamp_1904_epoch = self.msb
was_positive = timestamp_1904_epoch > 0
timestamp_1970_epoch = int(timestamp_1904_epoch - AbsoluteTime.BIAS_FROM_1970_EPOCH)

# Our bias is negative, so our sign should only change if we were previously positive.
is_positive = timestamp_1970_epoch > 0
if is_positive != was_positive and not was_positive:
raise OverflowError(f"Can't represent {str(self)} in datetime (1970 epoch)")

# Finally, convert the subseconds to microseconds.
microsecond = int(round(AbsoluteTime.NUM_MICROSECONDS * self.lsb / AbsoluteTime.NUM_SUBSECONDS))

# Start with UTC
dt = datetime.fromtimestamp(timestamp_1970_epoch, timezone.utc)
dt = dt.replace(microsecond=microsecond)
# Then convert to what was requested
return dt.astimezone(tz=tzinfo)

def __str__(self):
return f"AbsoluteTime(lsb=0x{self.lsb:x}, msb=0x{self.msb:x})"

def __eq__(self, other):
return self.msb == other.msb and self.lsb == other.lsb

def __lt__(self, other):
if self.msb == other.msb:
return self.lsb < other.lsb
else:
return self.msb < other.msb
1 change: 1 addition & 0 deletions src/handwritten/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from nidaqmx.grpc_session_options import *
from nidaqmx.scale import Scale
from nidaqmx.task import Task
from nidaqmx.time import AbsoluteTime
from nidaqmx.types import CtrFreq, CtrTick, CtrTime

try:
Expand Down
68 changes: 68 additions & 0 deletions src/handwritten/time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import ctypes
import functools
from datetime import datetime, timezone

@functools.total_ordering
class AbsoluteTime(ctypes.Structure):
zhindes marked this conversation as resolved.
Show resolved Hide resolved
# Please visit ni.com/info and enter the Info Code NI_BTF for detailed information.
# The summary is:
# * lsb - positive fractions (2^-64) of a second
# * msb - number of whole seconds since 12am, Friday, January 1, 1904, UTC

_pack_ = 4
_fields_ = [("lsb", ctypes.c_uint64), ("msb", ctypes.c_int64)]

BIAS_FROM_1970_EPOCH = 2082844800
NUM_SUBSECONDS = 2 ** 64
NUM_MICROSECONDS = 1000000

@classmethod
def from_datetime(cls, dt):
utc_dt = dt.astimezone(tz=timezone.utc)

# First, calculate whole seconds by converting from the 1970 to 1904 epoch.
timestamp_1970_epoch = utc_dt.timestamp()
was_negative = timestamp_1970_epoch < 0
timestamp_1904_epoch = int(timestamp_1970_epoch + AbsoluteTime.BIAS_FROM_1970_EPOCH)

# Our bias is positive, so our sign should only change if we were previously negative.
is_negative = timestamp_1904_epoch < 0
if is_negative != was_negative and not was_negative:
raise OverflowError(f"Can't represent {dt.isoformat()} in AbsoluteTime (1904 epoch)")

# Finally, convert the microseconds to subseconds.
lsb = int(round(AbsoluteTime.NUM_SUBSECONDS * utc_dt.microsecond / AbsoluteTime.NUM_MICROSECONDS))

return AbsoluteTime(lsb=lsb, msb=timestamp_1904_epoch)

def to_datetime(self, tzinfo=timezone.utc):
# First, calculate whole seconds by converting from the 1904 to 1970 epoch.
timestamp_1904_epoch = self.msb
was_positive = timestamp_1904_epoch > 0
timestamp_1970_epoch = int(timestamp_1904_epoch - AbsoluteTime.BIAS_FROM_1970_EPOCH)

# Our bias is negative, so our sign should only change if we were previously positive.
is_positive = timestamp_1970_epoch > 0
if is_positive != was_positive and not was_positive:
raise OverflowError(f"Can't represent {str(self)} in datetime (1970 epoch)")

# Finally, convert the subseconds to microseconds.
microsecond = int(round(AbsoluteTime.NUM_MICROSECONDS * self.lsb / AbsoluteTime.NUM_SUBSECONDS))

# Start with UTC
dt = datetime.fromtimestamp(timestamp_1970_epoch, timezone.utc)
dt = dt.replace(microsecond=microsecond)
# Then convert to what was requested
return dt.astimezone(tz=tzinfo)

def __str__(self):
return f"AbsoluteTime(lsb=0x{self.lsb:x}, msb=0x{self.msb:x})"

def __eq__(self, other):
return self.msb == other.msb and self.lsb == other.lsb

def __lt__(self, other):
if self.msb == other.msb:
return self.lsb < other.lsb
else:
return self.msb < other.msb
85 changes: 85 additions & 0 deletions tests/component/test_time.py
zhindes marked this conversation as resolved.
Show resolved Hide resolved
zhindes marked this conversation as resolved.
Show resolved Hide resolved
zhindes marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from copy import copy
import pytest
import random

from datetime import datetime, timedelta, timezone
from nidaqmx.time import AbsoluteTime

# Jan 1, 2002 = 98 years + 25 leapdays = 35,795 days = 3,092,688,000 seconds
JAN_01_2022_BTF = AbsoluteTime(lsb=0, msb=0xb856ac80)
zhindes marked this conversation as resolved.
Show resolved Hide resolved
JAN_01_2022_DATETIME = datetime(2002, 1, 1, tzinfo=timezone.utc)


def test___time___convert_from_utc___succeeds():
btf = AbsoluteTime.from_datetime(JAN_01_2022_DATETIME)
assert btf == JAN_01_2022_BTF


def test___time___convert_to_utc___succeeds():
dt = JAN_01_2022_BTF.to_datetime()
assert dt == JAN_01_2022_DATETIME


@pytest.mark.parametrize(
"tzinfo, expected_offset", [
(timezone(timedelta(minutes=30)), -1800),
(timezone(timedelta(minutes=-30)), 1800),
(timezone(timedelta(hours=1)), -3600),
(timezone(timedelta(hours=-1)), 3600),
]
)
def test___time___convert_to_and_from_tz___succeeds(tzinfo, expected_offset):
from_dt = datetime(2002, 1, 1, tzinfo=tzinfo)
btf = AbsoluteTime.from_datetime(from_dt)

assert btf.msb == JAN_01_2022_BTF.msb + expected_offset
assert btf.lsb == JAN_01_2022_BTF.lsb

# now convert back
to_dt = btf.to_datetime(tzinfo=tzinfo)
assert from_dt == to_dt


@pytest.mark.parametrize(
"microsecond, subseconds", [
(0, 0),
(1, 0x10C6F7A0B5EE),
(250000, 0x4000000000000000),
(500000, 0x8000000000000000),
(750000, 0xC000000000000000),
(999999, 0xFFFFEF39085F4800),
]
)
def test___time___convert_microseconds_to_and_from_subseconds___succeeds(microsecond, subseconds):
from_dt = JAN_01_2022_DATETIME.replace(microsecond=microsecond)
btf = AbsoluteTime.from_datetime(from_dt)

# whole seconds shouldn't change
assert btf.msb == JAN_01_2022_BTF.msb
assert btf.lsb == subseconds

# now convert back
to_dt = btf.to_datetime()
assert to_dt.microsecond == microsecond

# zero out microsecond, and we should be back
assert to_dt.replace(microsecond=0) == JAN_01_2022_DATETIME


# Note: I can't actually test overflow because Python's datetime object is
# limited to years 1-9999, but the NI-BTF format can represent time before the
# Big Bang and until about year 292 billion. Oh well.


def test___time___ordering___succeeds():
ordered = [
AbsoluteTime(msb=1, lsb=0),
AbsoluteTime(msb=2, lsb=0),
AbsoluteTime(msb=2, lsb=1),
AbsoluteTime(msb=2, lsb=2),
AbsoluteTime(msb=3, lsb=0)
]

shuffled = copy(ordered)
random.shuffle(shuffled)
assert sorted(shuffled) == ordered
Loading