Source code for lascar.engine.partitioner_engine

# This file is part of lascar
#
# lascar is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
#
#
# Copyright 2018 Manuel San Pedro, Victor Servant, Charles Guillemet, Ledger SAS - manuel.sanpedro@ledger.fr, victor.servant@ledger.fr, charles@ledger.fr

"""
partitioner_engine.py
"""
import numpy as np

from .engine import Engine


# class PartitionFunction:
#     def __init__(self, func, size):
#         self.function = func
#         self.size = size
#
#     def __call__(self, arg):
#         return self.function(arg)
#
#
# class guessFunction:
#     def __init__(self, func, guesses):
#         self.function = func
#         self.size = len(guesses)
#         self.guesses = guesses
#
#     def __call__(self, value):
#         return np.array([self.function(value,guess) for guess in self.guesses])


[docs]class PartitionerEngine(Engine): """ PartitionEngine is an abstract splecialized Engine which role is to partition the 'leakages', according to a 'partition_value' computed from the 'values'. The partition_value is computed by a fonction called 'partition', which ouputs are in partition_range. partition_value = partition(value) 0 <= partition_value < partition_size """ def __init__(self, partition_function, partition_range, order, name=None, jit=True): """ PartitionEngine :param name: the name chosen for the Engine :param session: the Session that will drive it :param partition_function: a function (or callable) which will be applied to the trace values and return a positive integer :param partition_range: the possible partition_values :param order: the order needed by the engine ( order=1: sum of leakages, order=2: sum of square of leakages,...) """ if isinstance(partition_range, int): self._partition_range = range(partition_range) else: self._partition_range = partition_range self._partition_range = np.array(self._partition_range, dtype=np.uint32) self._partition_size = len(self._partition_range) self._partition_range_to_index = np.zeros((self._partition_range.max()+1,), dtype=np.uint32) for i,j in enumerate(self._partition_range): self._partition_range_to_index[j] = i self._order = order self.jit = jit if jit: try: from numba import jit except Exception: raise Exception( "Cannot jit without Numba. Please install Numba or consider turning off the jit option" ) self._partition_function = jit(nopython=True)(partition_function) else: self._partition_function = partition_function Engine.__init__(self, name) def _initialize(self): self._acc_x_by_partition = np.zeros( (self._order, self._partition_size) + self._session.leakage_shape, dtype=np.double, ) # acc_x_by_partition[i,j,k] = sum( (leakages[k])**i | partition = j) self._partition_count = np.zeros((self._partition_size,), dtype=np.double) self.size_in_memory += self._acc_x_by_partition.nbytes self.size_in_memory += self._partition_count.nbytes if self.jit: from numba import jit @jit(nopython=True) def jitted_update(batchvalues, batchleakages, pfunc = self._partition_function, psize = self._partition_size, rng2idx = self._partition_range_to_index, order = self._order): pcount = np.zeros((psize,), dtype=np.uint32) acc_x = np.zeros((order, psize, batchleakages.shape[1]), dtype=np.double) for pv in np.arange(batchvalues.shape[0]): idx = rng2idx[pfunc(batchvalues[pv])] pcount[idx] += 1 acc_x[0, idx] += batchleakages[pv] for o in range(1,order): acc_x[o, idx] += np.power(batchleakages[pv], o + 1) return pcount, acc_x def new_update(batch): pcount, acc_x = jitted_update(batch.values, batch.leakages) self._partition_count += pcount self._acc_x_by_partition += acc_x self._update = new_update else: self._update = self._base_update def _base_update(self, batch): partition_values = list(map(self._partition_function, batch.values)) for i, v in enumerate(partition_values): idx = self._partition_range_to_index[v] self._partition_count[idx] += 1 for o in range(0, self._order): self._acc_x_by_partition[ o, idx ] += np.power(batch.leakages[i], o + 1, dtype=np.double) def _finalize(self): pass def _clean(self): del self._acc_x_by_partition del self._partition_count self.size_in_memory = 0
[docs] def get_mean_by_partition(self): """ Compute a np.array containing the means by partiion E[ leakage | partition(value) = i) for i in partiton_values :return: np.array containing the means by partition. """ acc = np.zeros(self._acc_x_by_partition.shape[1:], np.double) for v in self._partition_range: i = self._partition_range_to_index[v] acc[i] = self._acc_x_by_partition[0, i] / self._partition_count[i] return acc