# This file is part of lascar
#
# lascar is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
#
# Copyright 2018 Manuel San Pedro, Victor Servant, Charles Guillemet, Ledger SAS - manuel.sanpedro@ledger.fr, victor.servant@ledger.fr, charles@ledger.fr
"""
container.py
"""
import logging
import numpy as np
from collections import namedtuple
_Trace = namedtuple("Trace", ["leakage", "value"])
[docs]class Trace(_Trace):
"""
Trace is the class to represent a side-channel trace (Trace in lascar),
Genuinely, it is a tuple of two items:
- The first item is "leakage" and represents the side-channel observable
- The second item is "value" and represents the handled values during the observation of "leakage".
The only restriction here is that "leakage" and "data" must be numpy.arrays of any shape.
trace = (leakage, value) where leakage and value are numpy.arrays
"""
def __new__(cls, *args):
super_obj = super(_Trace, cls)
if len(args) != 2:
raise ValueError("Trace: Only two arguments.")
# if not isinstance(args[0], np.ndarray) or not isinstance(args[1], np.ndarray):
# raise ValueError("Trace: leakage and value must be numpy.ndarray.")
return super_obj.__new__(cls, args)
def __str__(self):
return "Trace with leakage:[%s, %s] value:[%s, %s]" % (
self.leakage.shape,
self.leakage.dtype,
self.value.shape,
self.value.dtype,
)
def __repr__(self):
return "{}({})".format(
self.__class__.__name__,
", ".join("{}=%r".format(name) for name in self._fields) % self,
)
def __eq__(self, other):
return np.all(self.leakage == other.leakage) and np.all(
self.value == other.value
)
def __ne__(self, other):
return not self.__eq__(other)
def __add__(self, other):
if isinstance(other, Trace):
return TraceBatchContainer(
np.vstack([self.leakage, other.leakage])
if len(self.leakage.shape) > 0
else np.hstack([self.leakage, other.leakage]),
np.vstack([self.value, other.value])
if len(self.value.shape) > 0
else np.hstack([self.value, other.value]),
)
if isinstance(other, TraceBatchContainer):
return TraceBatchContainer(
np.vstack(
[self.leakage, other.leakages]
if len(self.leakage.shape) > 0
else np.hstack([self.leakage, other.leakages])
),
np.vstack([self.value, other.values])
if len(self.value.shape) > 0
else np.hstack([self.value, other.values]),
)
[docs]class Container:
"""
Container class is an abstact class used to represent Side-Channel traces.
In lascar a trace is a couple (tuple) of a side-channel leakage associated to the values handled.
trace = (leakage, value)
Where both leakage and value can be represented as a numpy array.
The role of the Container class is to be overloaded so that it can deliver traces, stored as a specified format.
Mostly, the __getitem__/__setitem__ have to be overloaded when user want to write its own Container format class.
:param number_of_traces:
"""
def __init__(self, **kwargs):
"""
Basic Constructor.
To this point, child must implement leakages and values as np.array or AbstractArray
"""
self.logger = logging.getLogger(__name__)
if self.leakages.shape[0] != self.values.shape[0]:
raise ValueError(
"leakages and values dont share the same first dim (number_of_traces): %s %s"
% (self.leakages.shape, self.values.shape)
)
self.number_of_traces_max = self.leakages.shape[0]
self._leakage_base_abstract = AbstractArray(
self.leakages.shape[1:], self.leakages.dtype
)
self._leakage_section_abstract = AbstractArray(
self.leakages.shape[1:], self.leakages.dtype
)
self._leakage_abstract = AbstractArray(
self.leakages.shape[1:], self.leakages.dtype
)
self._value_base_abstract = AbstractArray(
self.values.shape[1:], self.values.dtype
)
self._value_section_abstract = AbstractArray(
self.values.shape[1:], self.values.dtype
)
self._value_abstract = AbstractArray(self.values.shape[1:], self.values.dtype)
self.number_of_traces = kwargs.get(
"number_of_traces", self.number_of_traces_max
)
self.leakage_section = kwargs.get("leakage_section", None)
self.value_section = kwargs.get("value_section", None)
self.leakage_processing = kwargs.get("leakage_processing", None)
self.value_processing = kwargs.get("value_processing", None)
@property
def leakage_section(self):
"""
Leakage area to be read from the original leakage.
:type: list, range, slice
"""
return self._leakage_section if hasattr(self, "_leakage_section") else None
@leakage_section.setter
def leakage_section(self, section):
self.logger.debug("Setting leakage_section to %s" % section)
if self.leakage_processing is not None:
self.logger.warning(
"Since leakage_section is being set, leakage_processing is set to None."
)
self.leakage_processing = None
if section is None: # reset leakage_section and leakage_end
self._leakage_section_abstract.update(self._leakage_base_abstract)
self._leakage_abstract.update(self._leakage_base_abstract)
else:
try:
leakage = self._leakage_base_abstract.zeros()[section]
self._leakage_section_abstract.update(leakage)
self._leakage_abstract.update(leakage)
except:
raise TypeError(
'Cant apply section "%s" to leakage of shape/dtype %s/%s'
% (
section,
self._leakage_base_abstract.shape,
self._leakage_base_abstract.dtype,
)
)
self._leakage_section = section
@property
def leakage_processing(self):
"""
Leakage_processing. function applied upon the leakages after reading and
leakage_section)
:type: function (or callable) taking leakage[leakage_section] as an
argument
"""
return (
self._leakage_processing if hasattr(self, "_leakage_processing") else None
)
@leakage_processing.setter
def leakage_processing(self, processing):
self.logger.debug("Setting leakage_processing to %s" % processing)
if processing is None:
self._leakage_abstract.update(self._leakage_section_abstract)
else:
try:
leakage = processing(self._leakage_section_abstract.zeros())
self._leakage_abstract.update(leakage)
except:
raise TypeError(
'Cant apply processing "%s" to leakage of shape/dtype %s/%s'
% (
processing,
self._leakage_base_abstract.shape,
self._leakage_base_abstract.dtype,
)
)
self._leakage_processing = processing
[docs] def apply_leakage_section(self, leakages):
if self.leakage_section is None:
return leakages
if self._leakage_base_abstract.shape == ():
return leakages[self.leakage_section]
return leakages[:, self.leakage_section]
[docs] def apply_leakage_processing(self, leakages):
if self.leakage_processing is None:
return leakages
if self._leakage_section_abstract.shape == (): # 0D leakage
return np.array([self.leakage_processing(l) for l in leakages])
return np.apply_along_axis(self.leakage_processing, 1, leakages)
[docs] def apply_both_leakage(self, leakages):
return self.apply_leakage_processing(self.apply_leakage_section(leakages))
@property
def value_section(self):
"""
Value area to be read from the original value.
:type: list, range, slice
"""
return self._value_section if hasattr(self, "_value_section") else None
@value_section.setter
def value_section(self, section):
self.logger.debug("Setting value_section to %s" % section)
if self.value_processing is not None:
self.logger.warning(
"Since value_section is being set, value_processing is set to None."
)
self.value_processing = None
if section is None: # reset value_section and value_end
self._value_section_abstract.update(self._value_base_abstract)
self._value_abstract.update(self._value_base_abstract)
else:
try:
value = self._value_base_abstract.zeros()[section]
self._value_section_abstract.update(value)
self._value_abstract.update(value)
except:
raise TypeError(
'Cant apply section "%s" to value of shape/dtype %s/%s'
% (
section,
self._value_base_abstract.shape,
self._value_base_abstract.dtype,
)
)
self._value_section = section
@property
def value_processing(self):
"""
Current value_processing. function applied upon the values after
reading and value_section.
:type: function (or callable) taking value[value_section] as an argument
"""
return self._value_processing if hasattr(self, "_value_processing") else None
@value_processing.setter
def value_processing(self, processing):
self.logger.debug("Setting value_processing to %s" % processing)
if processing is None:
self._value_abstract.update(self._value_section_abstract)
else:
try:
value = processing(self._value_section_abstract.zeros())
self._value_abstract.update(value)
except:
raise TypeError(
'Cant apply processing "%s" to value of shape/dtype %s/%s'
% (
processing,
self._value_base_abstract.shape,
self._value_base_abstract.dtype,
)
)
self._value_processing = processing
[docs] def apply_value_section(self, values):
if self.value_section is None:
return values
if self._value_base_abstract.shape == ():
return values[self.value_section]
return values[:, self.value_section]
[docs] def apply_value_processing(self, values):
if self.value_processing is None:
return values
if self._value_section_abstract.shape == (): # 0D value
return np.array([self.value_processing(v) for v in values])
return np.apply_along_axis(self.value_processing, 1, values)
[docs] def apply_both_value(self, values):
return self.apply_value_processing(self.apply_value_section(values))
[docs] def plot_leakage(self, key):
from lascar.plotting import plot
self.logger.debug("plot_leakage at index %s", key)
plot([self[i].leakage for i in key])
def __len__(self):
return self.number_of_traces
def __iter__(self):
"""
Container is iterable
It returns its trace during iteration.
:return: Iterator over the container traces.
"""
for i in range(len(self)):
yield self[i]
def __str__(self):
res = "Container with %d traces. " % (self.number_of_traces)
res += "leakages: %s, values: %s. " % (
self._leakage_abstract,
self._value_abstract,
)
if self.leakage_section is not None:
res += "leakage_section set to %s. " % self.leakage_section
if self.leakage_processing is not None:
res += "leakage_processing set to %s. " % self.leakage_processing
if self.value_section is not None:
res += "value_section set to %s. " % self.value_section
if self.value_processing is not None:
res += "value_processing set to %s. " % self.value_processing
return res
def __eq__(self, other):
if len(self) != len(other):
return False
for i in range(len(self)):
if self[i] != other[i]:
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
[docs] def get_leakage_mean_var(self):
"""
Compute mean/var of the leakage.
:return: mean/var of the container leakages
"""
from lascar import Session
session = Session(self).run()
return session["mean"].finalize(), session["var"].finalize()
# def to_trace(func):
# def wrapper(*args, **kwargs):
# result = func(*args, **kwargs)
# if isinstance(result, Trace):
# return result
# try:
# return Trace(result)
# except:
# raise TypeError("the output must be a 2-uple of numpy array: leakage,value.")
# return result
# return wrapper
[docs]class AbstractContainer(Container):
"""
AbstractContainer is a Container class used when the side-channel traces are generated from functions.
It can be used for instance:
- setting up sitions with oscilloscope and DUT
- when implementing Simulated traces
"""
def __init__(self, number_of_traces, **kwargs):
self.logger = logging.getLogger(__name__)
trace = self.generate_trace(0)
self.leakages = AbstractArray(
(number_of_traces,) + trace.leakage.shape, trace.leakage.dtype
)
self.values = AbstractArray(
(number_of_traces,) + trace.value.shape, trace.value.dtype
)
Container.__init__(self, **kwargs)
[docs] def generate_trace(self, idx):
"""
Generates a single trace indexed by an integer i
(the i^th trace)
Function to be overloaded
:param idx: integer
:return: a Trace
"""
raise NotImplemented
[docs] def generate_trace_batch(self, idx_begin, idx_end):
"""
Generates a trace_batch of specified indexes
"""
leakages = np.empty(
(idx_end - idx_begin,) + self._leakage_abstract.shape,
self._leakage_abstract.dtype,
)
values = np.empty(
(idx_end - idx_begin,) + self._value_abstract.shape,
self._value_abstract.dtype,
)
for i, j in enumerate(range(idx_begin, idx_end)):
leakage, value = self.generate_trace(j)
leakages[i] = leakage
values[i] = value
return TraceBatchContainer(leakages, values)
def __getitem__(self, key):
"""
:rtype: Trace or TraceBatch depending on key
"""
self.logger.debug("__getitem__ with key %s" % str(key))
if isinstance(key, int):
leakage, value = self.generate_trace(key)
leakage = self.apply_both_leakage(leakage.reshape((1,) + leakage.shape))[0]
value = self.apply_both_value(value.reshape((1,) + value.shape))[0]
return Trace(leakage, value)
elif isinstance(key, slice):
# check contiguity:
if key.step is not None and key.step > 1:
raise ValueError(
"AbstractContainer __getitem__ slice elements must be contiguous"
)
offset_begin = key.start if key.start else 0
offset_end = key.stop if key.stop else self.number_of_traces
elif isinstance(key, list):
# check contiguity:
if np.any(np.diff(np.array(key)) != 1):
raise ValueError(
"AbstractContainer __getitem__ list elements must be contiguous"
)
offset_begin = key[0]
offset_end = key[-1]
else:
raise ValueError(
"AbstractContainer __getitem__ only accepts int, list and slices (contiguous)"
)
if (
offset_begin < 0
or offset_end <= offset_begin
or offset_end > self.number_of_traces
):
raise ValueError(
"get_batch must have 0 <= offset_begin < offset_end must <= %d. Got (%d, %d)"
% (self.number_of_traces, offset_begin, offset_end)
)
leakages = np.empty(
(offset_end - offset_begin,) + self._leakage_abstract.shape,
self._leakage_abstract.dtype,
)
values = np.empty(
(offset_end - offset_begin,) + self._value_abstract.shape,
self._value_abstract.dtype,
)
try:
trace_batch = self.generate_trace_batch(offset_begin, offset_end)
leakages = self.apply_both_leakage(trace_batch.leakages)
values = self.apply_both_value(trace_batch.values)
return TraceBatchContainer(leakages, values)
except:
pass
for i, j in enumerate(range(offset_begin, offset_end)):
leakage, value = self.generate_trace(j)
leakages[i] = self.apply_both_leakage(
leakage.reshape((1,) + leakage.shape)
)[0]
values[i] = self.apply_both_value(value.reshape((1,) + value.shape))[0]
return TraceBatchContainer(leakages, values)
[docs]class TraceBatchContainer(Container):
def __init__(self, *args, **kwargs):
if len(args) == 2:
self.leakages = args[0] if not kwargs.get("copy", 0) else np.copy(args[0])
self.values = args[1] if not kwargs.get("copy", 0) else np.copy(args[1])
Container.__init__(self, **kwargs)
def __getitem__(self, key):
"""
Trace or TraceBatch getter
:param key: an int or an iterable
:return: the Trace or TraceBatch containing the Trace in key
"""
self.logger.debug("__getitem__ with key %s %s" % (str(key), type(key)))
if isinstance(key, (int, np.int64)):
leakage = self.apply_both_leakage(self.leakages[key : key + 1])[0]
value = self.apply_both_value(self.values[key : key + 1])[0]
return Trace(leakage, value)
else:
leakages = self.apply_both_leakage(self.leakages[key])
values = self.apply_both_value(self.values[key])
return TraceBatchContainer(leakages, values)
def __setitem__(self, key, value):
self.logger.debug("__setitem__ with key %s to %s", str(key), str(value))
self.leakages[key] = value.leakage if isinstance(key, int) else value.leakages
self.values[key] = value.value if isinstance(key, int) else value.values
[docs] def save(self, filename):
"""
Save the current TraceBatchContainer to a file using np.save
:param filename:
:return:
"""
self.logger.debug("save TraceBatch to %s" % filename)
with open(filename, "wb") as f:
np.savez(f, leakages=self.leakages, values=self.values)
[docs] @staticmethod
def load(filename):
"""
Load a file using np.load and create from it a TraceBatchContainer.
:param filename:
:return:
"""
tmp = np.load(filename)
return TraceBatchContainer(tmp["leakages"], tmp["values"])
pass
def __add__(self, other):
if isinstance(other, Trace):
return TraceBatchContainer(
np.vstack([self.leakages, other.leakage])
if len(self.leakages.shape) > 1
else np.hstack([self.leakages, other.leakage]),
np.vstack([self.values, other.value])
if len(self.values.shape) > 1
else np.hstack([self.values, other.value]),
)
if isinstance(other, TraceBatchContainer):
return TraceBatchContainer(
np.vstack([self.leakages, other.leakages])
if len(self.leakages.shape) > 1
else np.hstack([self.leakages, other.leakages]),
np.vstack([self.values, other.values])
if len(self.values.shape) > 1
else np.hstack([self.values, other.values]),
)
def __eq__(self, other):
if not isinstance(other, TraceBatchContainer):
return Container.__eq__(self, other)
if len(self) != len(other):
return False
return np.all(self.leakages == other.leakages) and np.all(
self.values == other.values
)
[docs] def get_leakage_mean_var(self):
"""
Compute mean/var of the leakage.
:return: mean/var of the container leakages
"""
try:
mean, var = self.leakages.mean(0), self.leakages.var(0)
return self.apply_both_value(mean), self.apply_both_value(var)
except:
return Container.get_leakage_mean_var()
[docs] @staticmethod
def export(container):
return container[:]
[docs]class AbstractArray:
"""
Used when your leakage or data cannot be represented as an
array (most of the time in
:class:`lascar.container.container.AbstractContainer`)
It simply emulates a few methods needed by other classes (such as
:class:`lascar.session.Session`)
:param shape: the shape of your leakages (or values)
:param dtype: the dtype of your leakages (or values)
"""
def __init__(self, shape, dtype):
self.shape = shape
self.dtype = dtype
[docs] def update(self, array):
self.shape = array.shape
self.dtype = array.dtype
[docs] def zeros(self):
return np.zeros(self.shape, self.dtype)
def __str__(self):
return "[%s, %s]" % (self.shape, self.dtype)
[docs]class AcquisitionFromGetters(AbstractContainer):
"""
An AcquisitionFromGetters is built from 2 object whose role are similar:
- value_getter which delivers values (for instance a class communicating with the dut)
- leakage_getter which delivers leakages (for instance an oscilloscope)
value_getter and leakage_getter must either:
- be iterable: each iteration returns the leakage or value
OR
- implement a .get() method which returns leakage or value
"""
def __init__(self, number_of_traces, value_getter, leakage_getter, **kargs):
self.value_getter = value_getter
self.leakage_getter = leakage_getter
if hasattr(value_getter, "__iter__"):
self.get_value = lambda: next(self.value_getter)
elif hasattr(value_getter, "get"):
self.get_value = lambda: self.value_getter.get()
else:
raise ValueError(
"value_getter must either be an iterator/generator OR implement a get() method"
)
if hasattr(leakage_getter, "__iter__"):
self.get_value = lambda: next(self.leakage_getter)
elif hasattr(leakage_getter, "get"):
self.get_leakage = lambda: self.leakage_getter.get()
else:
raise ValueError(
"leakage_getter must either be an iterator/generator OR implement a get() method"
)
AbstractContainer.__init__(self, number_of_traces, **kargs)
self.logger.info("Creating AcquisitionFromGenerators.")
[docs] def generate_trace(self, idx):
self.logger.debug("Generate trace %d." % (idx))
value = self.get_value()
leakage = self.get_leakage()
return Trace(leakage, value)