Accueil Le blog ebiznext
Initiation akka-streams

La programmation réactive avec akka-streams

L’objet de cet article est l’initiation à la programmation réactive avec akka-streams au travers d’un cas d’utilisation réel : la mise en place d’une plate-forme E-Commerce haute-performance avec un coût d’utilisation sur le cloud minimisé. Afin de répondre à cet objectif, nous allons :

  • présenter l’architecture de Jahia Commerce (vue d’avion),
  • identifier les points d’amélioration,
  • décrire la solution mise en oeuvre,
  • détailler l’implémentation avec akka-streams

#Architecture de la solution Jahia Commerce

Architecture jahia commerce basique

Jahia Commerce Factory est un produit de construction de sites E-Commerce de Jahia qui intègre les fonctionnalités suivantes :

  • Jahia Digital eXperience Manager (DXM) : Il s’agit du CMS qui permet de construire en mode WYSIWYG les pages du site d’E-Commerce.
  • Du module Commerce Factory : Module OSGI constitué de l’ensemble des widgets de E-Commerce. Construit autour de l’external data provider, ce module permet une intégration transparente de la base de produits exposée par le moteur de E-Commerce Mogobiz
  • Mogobiz : framework open-source exclusivement disponible en version commerciale via la souscription Jahia et offrant une API REST complète pour l’E-Commerce.
  • ElasticSearch (ES) : Moteur d’indexation des produits du site de E-Commerce.
  • Autres composants : Jahia Commerce intègre également d’autres composants tels qu’un moteur de recommandation, Spark, … TODO

#Points d’amélioration identifiés

La solution Jahia Commerce repose sur le produit Jahia DXM qui offre une richesse de gestion de contenus et de personnalisation. Cependant, le traitement de la restitution de contenus riches peut bénéficier d’optimisations que nous décrivons ci-après.

Dans Jahia DXM, tout est contenu. Une page est donc un contenu intégrant d’autres contenus. Il est donc nécessaire d’effectuer le rendu des contenus de manière récursive pour obtenir la page demandée.

Rendu recursive jahia

Afin d’accélérer le temps de génération, Jahia DXM repose sur un système de cache applicatif, Ehcache, où chaque contenu peut être mis en cache. Ce cache applicatif implique que plus il y a de contenus, plus la quantité de mémoire vive requise est importante si l’on souhaite avoir de bonnes performances.

Cette gestion de cache conduit donc à :

  • de la consommation CPU pour chaque requête HTTP arrivant sur le serveur car des filtres de rendus sont exécutés systématiquement,
  • de la consommation mémoire proportionnelle au nombre de contenus mis à disposition,
  • un délai inhérent au cache applicatif où les éléments mis en cache sont sujets au garbage collector et à la réplication dans un environnement en cluster.

La mise en place d’une telle solution dans le cloud peut avoir un impact financier pour un site de E-Commerce à fort traffic.

Solution mise en oeuvre

Architecture jahia commerce optimise

Afin de limiter la consommation de CPU, de mémoire et d’améliorer par la même occasion l’un des éléments déterminant du SEO qui est le temps de réponse des pages, un cache HTTP est positionné en amont des services Jahia Commerce et Mogobiz. Nous avons implémenté cette solution sur la base d’un cache HTTP Varnish et Nginx.

En mettant en place le cache des requêtes HTTP et les règles de mises en cache appropriées, la majorité des pages consultées ne sollicitent plus les services Jahia Commerce. Une économie sur le temps CPU est donc réalisé à moindre coût, permettant par la même occasion de diminuer drastiquement le temps de réponse. Avec cette architecture, seule la ou les premières visites d’une page nécessitent un traitement par Jahia Commerce.

Il nous reste à régler le délai d’affichage lors du premier accès aux pages du site. Ce problème récurrent dans les applications Web quel qu’elles soient requiert pour sa résolution un pré-chargement des contenus en cache.

Le volume de pages à mettre en cache est proportionnel au nombre de produits exposés qui peut être important. La rapidité d’exécution du batch est donc primordiale.

Pour optimiser les performances de mise en cache, nous avons fait le choix du DSL Akka-Streams, une librairie qui implémente le paradigme de la programmation réactive.

#Présentation d’akka-streams Akka-streams est une librairie open-source développée par TypeSafe, respectant la spécification Reactive Streams. Il s’agit donc d’une librairie implémentant le paradigme de la programmation réactive permettant ainsi le traitement de données en continu. Elle fournit une abstraction du modèle d’acteur akka et permet d’écrire des programmes parallélisables sans se préoccuper des problématiques des threads et de leur synchronisation. La lisibilité du code est également un point fort de cette librairie grâce à son DSL très expressif.

##Principes de la programmation réactive La programmation réactive est un paradigme centré sur la donnée, où celle-ci circule dans un graphe en commençant par une source et en allant vers une cible (sink) tout en passant par différents nœuds de traitement (flow). En combinant cette façon de programmer avec une programmation fonctionnelle, ce paradigme favorise grandement :

  • la parallélisation des traitements à moindre effort,
  • la création d’un programme responsive sans callback hell,
  • la lisibilité du code et donc la maintenabilité de celui-ci.

##Définition des concepts d’akka-streams ###Les différents nœuds de traitement Voici la liste des différents types de nœuds existant dans akka-streams :

  • d’un ou plusieurs nœuds sources : il s’agit d’un nœud dont la particularité est l’absence d’entrée. Il s’agit d’un nœud producteur de données qui alimente le graphe.
  • De plusieurs nœuds de traitement (flows) :
  • simples : comporte une entrée et une sortie. La donnée circulant à travers ce nœud peut être altérée,
  • séparateurs : comporte une entrée et plusieurs sorties. Ce type de nœud permet soit de dupliquer la donnée sur les différentes sorties, soit d’aiguiller la donnée ou bien paralléliser tout simplement le traitement des données en balançant sur différents nœuds.
  • agrégateurs : comporte plusieurs entrées et une sortie. Ce type de nœud permet soit d’agréger l’ensemble des entrées vers une sortie, soit d’effectuer une opération avec l’ensemble des entrées pour produire une donnée en sortie.
  • De nœuds composites (Partials Graphs) : il s’agit d’un nœud regroupant différents nœuds. Formant ainsi un graphe partiel.
  • d’un ou plusieurs nœuds sinks : il s’agit d’un nœud dont la particularité est l’absence de sortie. C’est un nœud terminant le graphe.

Le lien entre ces différents nœuds correspond à des arcs. Dans akka-streams, ces arcs sont représentés par la déclaration des raccordements entre les sorties et les entrées.

Un graphe nécessite a minima une source et une cible. La raccordement des nœuds forment un graphe partiel ou fermé. Un graphe est dit fermé si et seulement s’il n’existe aucun nœud avec une sortie non connectée.

Chacun de ces nœuds, à l’exception du nœud source, possède une zone tampon où les données sont stockées dans une file. Cette zone tampon permet de gérer le back-pressure.

###Back-pressure Le back-pressure correspond à une stratégie à appliquer sur la zone tampon d’un nœud. Cette gestion permet de stabiliser l’application et de maîtriser la quantité de données transitant dans le graphe. Cette stratégie est utile lorsque le producteur produit plus de données que ce que le consommateur est en capacité de gérer.

Akka-streams implémente une version de back-pressure asynchrone et non-bloquante respectant les spécifications de Reactive Streams. La configuration du back-pressure se fait sur les nœuds et permet d’appliquer une stratégie sur la zone tampon à exécuter lorsque le tampon est plein. Ces différentes stratégies sont :

  • de remplir la zone tampon à la demande lorsque l’on est capable de gérer la quantité de données générées par le producteur.
  • de gérer le tampon plein alors que de nouveaux éléments continuent d’arriver, en supprimant :
  • le nouvel élément
  • l’élément le plus ancien
  • l’élément le plus récent
  • toute la zone tampon
  • d’arrêter l’exécution du graphe avec un statut en échec.

#Implémentation de la solution

Cache dataflow

Ce schéma illustre de manière macroscopique le dataflow implémenté. L’implémentation a été développée en Scala et se repose sur les librairies suivantes :

  • akka-stream: utilisé pour la programmation réactive
  • spray : utilisé pour le client HTTP
scala 
Source(c.getEsIterator().toStream).map ( hit => {
		c.fields.map(hit.getOrElse(_, List()))
	  })

Etape 1 : Un itérateur (1a) sur les données en provenance d’ElasticSearch est créé afin d’alimenter le dataflow. Pour chacun des champs demandé dans la requête ES, une donnée sous forme de liste et contenants les champs demandés d’un document est retournée (1b).

scala
//remove all the hits that doesn't have all fields
.filter{ fields =>
	val result: Boolean = fields.filter(!_.isEmpty).length == c.fields.length
	if(!result){
	  logger.warn("One hit doesn't have all required fields")
	}
	result
}
// encode fields
.map {fields =>
  fields.zip(c.encodeFields).map {
	case (f, true) => URLEncoder.encode(f,StandardCharsets.UTF_8.name())
	case (f, _) => f
  }
}
// currently keep the first value in case where a field has multiple value.
.map{ fields =>
  fields.map(l => l(0))
}

Etape 2 : Une fois ces données récupérées, nous ne traitons que les documents ES ayant l’ensemble des champs demandés. Il est possible de faire évoluer le code pour autoriser l’absence de champs.

Dans le cas où il existe plusieurs entrées, seule la première valeur est maintenue. Il s’agit d’un choix subjectif car la gestion de la valeur multiple n’a pas été rencontrée.

Une fois ces champs récupérés, un encodage des champs spécifiés est réalisé avant de construire l’URL cible.

scala
.mapAsyncUnordered(h.maxClient) { (fields: List[String]) => {
  val fullUri: String = h.getFullUri(fields)
  logger.info(s"Requesting ${h.method} ${fullUri}")
  val request: HttpRequest = HttpRequest(h.method,fullUri,buildHeaders(h))
  pipeline(request).flatMap(httpResponse => Future {
	(fullUri, httpResponse)
  })
}
}.toMat(Sink.ignore)(Keep.right)

Etape 3 : On effectue l’appel HTTP en construisant l’URL à partir des champs du document ES récupéré. Il s’agit d’une étape lente comparée aux traitements en amont. Afin d’améliorer les performances de cette étape, une parallélisation des requêtes HTTP de manière asynchrone est effectuée grâce à la méthode mapAsyncUnordered. Le degré de parallélisme est défini par le paramètre de cette méthode. Nous pouvons souligner ici la facilité de rendre son code parallèle avec l’API d’akka-streams.

#Limitation de la solution La solution envisagée avec akka-streams a néanmoins une limite: l’absence de la distributivité. Il n’est donc pas envisageable de traiter un gros volume de données en un temps réduit à moins de trouver des solutions de contournement comme le partitionnement de différentes exécutions sur différentes machines ou bien de restreindre la taille des données entrantes.

Si la distributivité est un élément clé, il est possible de distribuer les traitements en couplant Akka à Spark habituellement appliqué dans un contexte BigData.