TP Jointure et partionnement sur Hadoop

Remarque: dans ce TP, ont souhaite traiter de grand volumes de données, ce qui signifie des temps de traitement longs. Ci-dessous, quelques conseils pour y faire face:

  • Tester vos traitements (chargements et requêtes) sur de petits volumes d'abord (par exemple 2 fichiers pour chaque table)
  • Utilisez un utilitaire comme screen (documentation) qui permettra de continuer les requêtes tout en étant déconnecté du master (pour l'installer, lancer la commande suivante sur le master: sudo apt-get install screen).

Modalités de rendu

Ce TP est à rendre pour le dimanche 23 octobre 2016. Il est demandé de déposer un rapport sur tomuss dans la case rendu_hadoop1) qui comprendra:

  • Une description succinte (quelques lignes) du jeu de données tel que vous le comprenez
  • Le script permettant de charger les données dans le cluster Hadoop pour le cas sans partition
  • Le code utilisé pour exécuter en map/reduce la requête dans le cas sans partionnement préalable
  • Le code map/reduce utilisé pour dénombrer le nombre maximum de source par objet.
  • Le script permettant de charger les données en les partionnant au préalable. On prendra soins d'expliciter le critère de partionnement et les transformations éventuelles à opérer sur les données
  • Le code utilisé pour exécuter en map/reduce la requête dans le cas avec une partion préalable

Données et requête

On considère le jeu de données du TP précédent, mais dans son ensemble, c'est-à-dire les deux tables Source et Object et tous les fichiers de données de ces tables. Une description plus détaillée de ce schéma est disponible ici.

On souhaite répondre à la question suivante: calculer pour chaque objet observé avant le point temporel (earliestObsTime) 50980.0 50985.0 le nombre d'observations (i.e. de tuples de la table Source) ainsi que la moyenne de la mesure sourceWidth_SG flux_Gaussian:

SELECT o.objectId, count(*) as cnt, avg(flux_Gaussian) flxG_avg
FROM Source s join Object o ON s.objectId = o.objectId
WHERE earliestObsTime <= 50985.0 -- AND s.objectId IS NOT NULL
GROUP BY o.objectId

Il y a deux possibilités pour répondre à cette requête: effectuer une jointure en utilisant la valeur de l'attribut de jointure (objectId) comme clé lors du shuffle ou effectuer la jointure au niveau du map (ce qui suppose que les données concernées par cette jointure se trouvent dans le même noeud). Dans ce TP on va d'abord implémenter la première puis la seconde version.

Si on se limite au contenu des fichiers Source-001 et Object-001, on doit obtenir 152 ou 153 résultats.

Jointure via la clé de shuffle/reduce

Chargement des données

Écrire un script de chargement des données de la machine data dans le HDFS. Pour les besoins de ce TP on ne fera pas de réplication. Changer le nombre de réplicats peut se faire via hadoop fs -setrep 1 fichier_ou_rep (c.f. documentation). Ce script se contentera de récupérer sur votre master les fichiers sur la machine data, les décompresser et les charger dans le HDFS. Attention, il faut faire ces opérations pour un fichier puis effacer ce dernier de votre master avant de passer au fichier suivant sous peine de saturer le disque du master.

Requête

Créer des classes pour les opérations de map et reduce nécessaires pour répondre à la requête du TP via une jointure dans le shuffle/reduce. On rappelle les points suivants:

  • La clé de sortie du map et du reduce est la valeur de l'attribut de jointure
  • Les tuples des deux tables doivent faire partie du même jeu de données
  • Il faut distinguer les tuples provenant de chacune des tables dans la phase de reduce

Réfléchir au bon moment (i.e. map ou reduce ?) pour effectuer le filtre (partie WHERE dans la requête).

Lancer la requête et évaluer le temps de calcul.

Partionnement préalable

Avant de pouvoir procéder à la suite, il est nécessaire d'avoir la réponse à la requête suivante (nombre maximum de source par objet):

SELECT MAX(cnt)
FROM (SELECT objectId, count(*) as cnt
      FROM Source
      WHERE objectId IS NOT NULL
      GROUP BY objectId)

Créer un ou polusieurs jobs map/reduce pour répondre à cette question.

Supprimer ensuite le jeu de données du HDFS afin de faire de la place pour en charger une version partionnée à la main. On souhaite faire maintenant faire la jointure dans le map. Cela ne peut fonctionner que si tous les tuples à combiner sont traités par le même mapper, c'est-à-dire se situent sur le même noeud. Dans HDFS cela signifie qu'ils doivent se trouver dans le même fichier de données, celui-ci devant avoir une taille inférieure à la taille d'un bloc (64 Mo). Une solution pourrait être de créer un fichier par valeur de l'attribut de jointure, mais on risque de tomber dans le “small files problem ” (voir ici par exemple). Il faut donc trouver un compromis entre les deux extrêmes suivants:

  • Tout mettre dans un seul fichier. Cela revient à ne pas partionner explicitement, ce qui ferait échouer la jointure dans le map car les tuples à joindre se retrouveraient éclatés sur plusieurs noeuds).
  • Faire un fichier par valeur de l'attribut de jointure, ce qui amènerait à créer trop de petits fichiers.

Il faut donc regrouper au sein d'un même fichier plusieurs valeurs de l'attribut de jointure.

Jointure dans le map

Une fois la partition effectuée coder et lancer un job map/reduce pour répondre à la requête en effectuant la jointure dans le mapper. Il peut être nécessaire de munir la classe du mapper de champs permettant de mémoriser les tuples déjà rencontrés. Pour cela, il peut être utile de gérer le démarrage et la fin d'une tâche en surchargeant les méthodes setup et cleanup de la classe Mapper

De même que pour la première version, évaluer le temps nécessaire pour répondre à la requête.

Attention cependant, le TextInputFormat (utilisé dans le main) utilisé ne garanti pas que le fichier soit intégralement lu par le même mapper. Il faut pour cela créer une sous-classe de TextInputFormat en surchargeant la méthode isSplitable de façon à toujours renvoyer false (cf doc)

1)
un rapport par binôme, le fichier étant déposé automatiquement pour les deux étudiants