from pandas import Series, DataFrame
from numpy import array, arange, log10, ndarray
from .expected import _test_
from .constants import digs_dict
from .stats import Z_score
def _set_N_(len_df, limit_N):
""""""
# Assigning to N the superior limit or the lenght of the series
if limit_N is None or limit_N > len_df:
return len_df
# Check on limit_N being a positive integer
else:
if limit_N < 0 or not isinstance(limit_N, int):
raise ValueError("limit_N must be None or a positive integer.")
else:
return limit_N
[docs]def get_mantissas(arr):
"""Computes the mantissas, the non-integer part of the log of a number.
Args:
arr: array of integers or floats
Returns:
Array of floats withe logs mantissas
"""
log_a = abs(log10(arr))
return log_a - log_a.astype(int) # the number - its integer part
[docs]def prepare(data, digs, limit_N, simple=False, confidence=None):
"""Transforms the original number sequence into a DataFrame reduced
by the ocurrences of the chosen digits, creating other computed
columns
"""
N = _set_N_(len(data), limit_N=limit_N)
# get the number of occurrences of the digits
v = data.value_counts()
# get their relative frequencies
p = data.value_counts(normalize=True)
# crate dataframe from them
dd = DataFrame({'Counts': v, 'Found': p}).sort_index()
# join the dataframe with the one of expected Benford's frequencies
dd = _test_(digs).join(dd).fillna(0)
# create column with absolute differences
dd['Dif'] = dd.Found - dd.Expected
dd['AbsDif'] = dd.Dif.abs()
if simple:
del dd['Dif']
return dd
else:
if confidence is not None:
dd['Z_score'] = Z_score(dd, N)
return N, dd
[docs]def subtract_sorted(data):
"""Subtracts the sorted sequence elements from each other, discarding zeros.
Used in the Second Order test
"""
sec = data.copy()
sec.sort_values(inplace=True)
sec = sec - sec.shift(1)
sec = sec.loc[sec != 0]
return sec
[docs]def prep_to_roll(start, test):
"""Used by the rolling mad and rolling mean, prepares each test and
respective expected proportions for later application to the Series subset
"""
if test in [1, 2, 3]:
start[digs_dict[test]] = start.ZN // 10 ** ((
log10(start.ZN).astype(int)) - (test - 1))
start = start.loc[start.ZN >= 10 ** (test - 1)]
ind = arange(10 ** (test - 1), 10 ** test)
Exp = log10(1 + (1. / ind))
elif test == 22:
start[digs_dict[test]] = (start.ZN // 10 ** ((
log10(start.ZN)).astype(int) - 1)) % 10
start = start.loc[start.ZN >= 10]
Expec = log10(1 + (1. / arange(10, 100)))
temp = DataFrame({'Expected': Expec, 'Sec_Dig':
array(list(range(10)) * 9)})
Exp = temp.groupby('Sec_Dig').sum().values.reshape(10,)
ind = arange(0, 10)
else:
start[digs_dict[test]] = start.ZN % 100
start = start.loc[start.ZN >= 1000]
ind = arange(0, 100)
Exp = array([1 / 99.] * 100)
return Exp, ind
[docs]def mad_to_roll(arr, Exp, ind):
"""Mean Absolute Deviation used in the rolling function
"""
prop = Series(arr)
prop = prop.value_counts(normalize=True).sort_index()
if len(prop) < len(Exp):
prop = prop.reindex(ind).fillna(0)
return abs(prop - Exp).mean()
[docs]def mse_to_roll(arr, Exp, ind):
"""Mean Squared Error used in the rolling function
"""
prop = Series(arr)
temp = prop.value_counts(normalize=True).sort_index()
if len(temp) < len(Exp):
temp = temp.reindex(ind).fillna(0)
return ((temp - Exp) ** 2).mean()