<center><h1>Données et encombrement mémoire</h1></center>


<center><h2>- la riposte graduée avec Python -</h2></center>


<center><h3> Christian Poli, SED Inria Saclay</h3></center>
<center><h4> 6 décembre 2018</h4></center>

### Constat :

* Quand les données tiennent confortablement en mémoire, le choix des outils et de méthodes est très riche (Numpy, Pandas etc.)
* Quand le passage à l'échelle s'impose, le choix est plus restreint, mais les bons outils ne manquent pas (Dask, Spark etc.).
* Entre les deux, il y a une zone grise ...

### Objectif :

* faire un tour d'horizon des quelques outils et pratiques à envisager avant d'avoir recours aux (trop?) grands moyens ...


### Questions liées à la nature du traitement :
* connaissance à priori ou non des volumes des données à traiter
* possibilité ou non de travailler sur des échantillons
* données creuses ou pas

### Il n'y a pas des petites économies ...

* Supprimer les références parasites aux données (attention aux variables globales et aux références circulaires!)
* Préférer les numpy array aux listes pour les données conséquentes (surtout numériques)
* Avec Numpy et Pandas, préférer le slicing au "fancy indexing"

In [None]:
import numpy as np
import sys

arr = np.arange(1000)
a1 = arr[range(100)] # fancy indexing
print("Fancy indexing: ", sys.getsizeof(a1))
a2 = arr[:100] # slicing
print("Slicing: ", sys.getsizeof(a2))
print("a1 == a2", all(a1 == a2))

* Ne pas negliger le typage des donnée avec Numpy et Pandas (voir exemple)

In [None]:
"""
By default data typing on load
"""
import pandas as pd
import os
MB = 2**20
# data are available here
# https://data.srcity.org/api/views/ynv7-pkkm/rows.csv?accessType=DOWNLOAD
print("File size (Mb): {:03.2f}".format( 
      os.stat('../data/srcity-weather.csv').st_size/MB))

df = pd.read_csv('../data/srcity-weather.csv')
df.info(memory_usage='deep', max_cols=0)

def avg_mem_usage(df):
    """
    Displays mem.usage in a dataframe by dtype
    """
    for dt in set(df.dtypes): 
        seld_dt = df.select_dtypes(include=[dt])
        b = seld_dt.memory_usage(deep=True).mean() 
        bpcell = b//len(df) + 1
        mb = b/ MB
        print(f"Memory usage (mean values) for {dt} "
              f"cols: {mb :03.2f}"
              f" MB, {bpcell} bytes/cell")
avg_mem_usage(df)
#df.head()


In [None]:
"""
Downcast num. data types on load
"""
import numpy as np
downcast = {'float64': np.float32, 'int64': np.int16,'object': object}
dc_types = {col: downcast[dt.name] for  (col, dt) in df.dtypes.items()}

df2 = pd.read_csv('../data/srcity-weather.csv', dtype=dc_types)
print("*** Memory usage before downcasting num. cols ***")
avg_mem_usage(df)

print("*** Memory usage after downcasting num. cols ***")
avg_mem_usage(df2)
df2.info(memory_usage='deep', max_cols=0)

In [None]:
"""
Non-numerical (i.e. object) columns
"""
object_cols = df2.dtypes[df2.dtypes==object]
print("*** Object cols ***")
print(object_cols)

In [None]:
df2.head()[object_cols.index]

In [None]:
"""
date cols parsing on load
"""
df3 = pd.read_csv('../data/srcity-weather.csv', dtype=dc_types, 
                  parse_dates=['RowID','DateTime'])
avg_mem_usage(df3)
df3.info(memory_usage='deep', max_cols=0)

In [None]:
df3.head()[object_cols.index]

In [None]:
"""
Les données catégorielles
"""
dc_types['WindDir'] = 'category'
dc_types['HiDir'] = 'category'
df4 = pd.read_csv('../data/srcity-weather.csv', dtype=dc_types, 
                  parse_dates=['RowID','DateTime'])
avg_mem_usage(df4)
df4.info(memory_usage='deep', max_cols=0)

In [None]:
df4.head()[object_cols.index]

### Les traitements par lots avec Pandas

Supposons qu'on veut calculer les valeurs max sur toutes les variables flottantes du dataframe précédent, ce qui n'est pas bien compliqué en principe ...

In [None]:
df = pd.read_csv('../data/srcity-weather.csv')
float_cols = df.dtypes[df.dtypes=='float']
max_vals = df[float_cols.index].max()
print("*** Max values for all float variables ***")
print(max_vals)

... mais les choses se compliquent un peu quand le volume de données ne tient pas en mémoire.
Dans ce cas, une alternative est le traitement "par batch", en découpant les données en fragments via l'option **chunksize** de la fonction read_csv() :

In [None]:
"""
Batch processing with Pandas
"""

max_vals_incr = pd.Series(index=float_cols.index, data=-np.inf)
it = pd.read_csv('../data/srcity-weather.csv', chunksize=10000) # TextFileReader
print("Iterator type: ", type(it))
for dfi in it:
    max_vals_incr = np.maximum(max_vals_incr, 
                               dfi[float_cols.index].max())
print("*** Max float values (batch mode)***")
print(max_vals_incr)

### Autres options remarquables de DataFrame.read_csv() :

* **usecols** : sélection de colonnes
* **nrows** : nombre de lignes à importer
* **skiprows** : nombre (ou liste ou fonction) de lignes à sauter => outil de sampling

**NB:** La puissance de Pandas la rend très utile comme "couteau suisse", y compris quand d'autres solutions de stockage sont envisagées.

### Le mapping des fichiers en mémoire

Hélas, tous les algorithmes ne se prêtent pas aussi bien à une implémentation par batch (exemple: calcul des quantiles, tri etc.) ...

Une stratégie possible est le mapping en mémoire, basée sur la fonction système **mmap** qui utilise efficacement les mécanismes de pagination de l'OS. Le mapping permet d'accéder des petits fragments du fichier en mémoire sans tout charger en même temps.
Exemple :

In [None]:
import numpy as np
nb_rows = len(df)
filename = 'weather.dat'
fp = np.memmap(filename, dtype='float32', mode='w+', 
               shape=(nb_rows,len(float_cols)))
it = pd.read_csv('../data/srcity-weather.csv', 
                 chunksize=10000, usecols=float_cols.index)
offs = 0
for dfi in it:
    fp[offs:offs+len(dfi), :] = dfi
    offs += len(dfi)
print("*** First row in fp ***")
print(fp[:1,:])
print("*** First row in the initial dataframe ***")
print(df.loc[:1,float_cols.index])
print("*** Last row in fp ***")
print(fp[nb_rows-2:,:])
print("*** Last row in the initial dataframe ***")
print(df.loc[nb_rows-2:,float_cols.index])

### Le format HDF5

* standardisé et portable
* nombre réduit de fichiers
* compression transparente, chunking etc.
* nombreux outils, fonctionnalités ...
* module Python **h5py**

Les objets HDF5 :

* Le groupe
    - liste d'attributs
    - liste d'objets HDF5, groupes compris (pattern composite)
* Le dataset
    - datatype (equiv. dtype de Numpy)
    - dataspace (dimensions)
    - stockage (d'un seul tenant ou fragmenté) 
* L'attribut
    - métadonnées
    - valeur + datatype

**NB: ** L'interface se fait via des Numpy arrays

In [None]:
import h5py
import numpy as np
with h5py.File("weather.hdf5", "w") as f:
    dset = f.create_dataset("srweather", (nb_rows,len(float_cols)),
                            dtype='float32')
    offs = 0
    it = pd.read_csv('../data/srcity-weather.csv', 
                     chunksize=10000, usecols=float_cols.index)
    for dfi in it:
        #print(dfi.info(memory_usage='deep', max_cols=0))
        dset[offs:offs+len(dfi), :] = dfi
        offs += len(dfi)
    print("*** First row in dset ***")
    print(dset[:1,:])
    print("*** First row in the initial dataframe ***")
    print(df.loc[:1,float_cols.index])
    print("*** Last row in dset ***")
    print(dset[nb_rows-2:,:])
    print("*** Last row in the initial dataframe ***")
    print(df.loc[nb_rows-2:,float_cols.index])

**NB:** Il existe un conteneur de type "dataframe" ou "table" basé sur HDF5 appelé [PyTables](https://www.pytables.org/) et interfacé avec **Pandas**

### Quelques conteneurs alternatifs

* Les objets [bcolz](https://bcolz.readthedocs.io/en/latest/)
* Les [roaring bitmaps](https://pyroaringbitmap.readthedocs.io/en/latest/)
* Les [matrices creuses](https://docs.scipy.org/doc/scipy/reference/sparse.html)
* Le conteneur [Zarr](https://zarr.readthedocs.io/en/stable/)

### Les objets bcolz

* orientés colonne
* compression en mémoire à la volée via [Blosc](http://blosc.org/)
* propose deux conteneurs : **carray** et **ctable**

In [None]:
import bcolz
N = 2**20
exp_size = N*(4+8)
ct = bcolz.fromiter(((i,i*i) for i in range(N)), 
                    dtype="i4,f8", count=N)
act_size = sys.getsizeof(ct)
print("Expected size (MB): {:03.2f}".format(exp_size/MB))
print("Actual size of ct (MB): {:03.2f}".format(act_size/MB))
print("Default compression ratio: {:03.2f}".format(exp_size/act_size))
ct = bcolz.fromiter(((i,i*i) for i in range(N)), 
                    dtype="i4,f8", count=N, 
                    cparams=bcolz.cparams(clevel=9))
act_size = sys.getsizeof(ct)
print("Higher compression ratio: {:03.2f}".format(exp_size/act_size))


### Les roaring bitmaps

* traitement efficace des ensembles d'entiers 32 bits :
    - très faible occupation mémoire
    - rapidité
* les entiers membres sont triés par ordre croissant
* interface très proche de celle des **sets** de la lib standard complétée par des fonctions spécifiques aux données numériques (min, max etc.)
* plus de [précisions sur l'algorithme](https://github.com/RoaringBitmap/RoaringBitmap)

In [None]:
from pyroaring import BitMap

bm1 = BitMap(range(10))
bm2 = BitMap(range(5, 15))

print("Union : ", bm1 | bm2)
print("Intersection: ", bm1 & bm2)
print("Difference: ", bm1 - bm2)


### Le conteneur Zarr

* destiné au tableaux multi-dimensionnels
* tout comme bcolz utilise le métacompresseur Blosc
* permet le chunking multi-dimensionnel
* [documentation](https://zarr.readthedocs.io/en/stable/)

### Les matrices creuses

Le module **scipy** implémente plusieurs types de matrices creuses:

* **CSC:** Compressed Sparse Column matrix
* **CSR:** Compressed Sparse Row matrix
* **LIL:** Row-based linked list sparse matrix
* **DOK:** Dictionary Of Keys based sparse matrix

### Dask

* bibliothèque de calcul parallèle
* exécution diférée 
* décompose les gros traitements en petites tâches organisées en graphe orienté acyclique et ordonancées par un scheduler => bien adapté pour les traiements out-of-core
* propose trois types de conteneurs :
    - array
    - dataframe
    - bag

**NB:** Les [arrays](http://docs.dask.org/en/latest/array.html) et [dataframes](http://docs.dask.org/en/latest/dataframe.html) de Dask sont moins riches en fonctionnalités que celles de Numpy et Pandas

Exemple de traitement :

In [None]:
from dask import dataframe as dd
ddf = dd.read_csv('../data/srcity-weather.csv', 
                  usecols=float_cols.index)
result = ddf.mean()
result.dask

In [None]:
result.visualize()

In [None]:
mean_centered = ddf - ddf.mean()
mean_centered.dask

In [None]:
mean_centered.visualize()

In [None]:
result.compute()

In [None]:
mean_centered.compute()

### Quand utiliser (ou pas) Dask ?

Quelques pistes sont fournies par la doc :

* pour **Dask Array** voir [Scope](http://docs.dask.org/en/latest/array.html#scope)
* pour **Dask Dataframe** voir [Common Uses and Anti-Uses](http://docs.dask.org/en/latest/dataframe.html#common-uses-and-anti-uses)


<center><h1>Affaire à suivre ...</h1></center>



<center><h1>Merci!</h1></center>