Objectif

Lors de traitement asynchrone, nous pouvons traiter deux messages traitant un même objet métier. Pour éviter les problèmes de concurrence, l’objectif est de réaliser une exclusion de traitement. Pour réaliser cette exclusion, on va utiliser les locks offerts par PostgreSQL et laisser JPA utiliser le SELECT FOR UPDATE.

Séquence d’acquisition du lock

Pour un traitement unitaire, l’objectif est de récupérer le lock en base de données. Si le lock n’existe pas alors, nous allons le créer.

sequenceDiagram 
    participant M as messaging
    participant T1 as Thread 1
    participant PG as Postgresql
    M -)+ T1: receive message
    T1 ->> PG : acquireLock()

    alt le lock n'existe pas en base de données
    PG --x T1: empty
    critical nouvelle Transaction
    T1 ->> PG: createLock()
    PG -->> T1: created
    end
    T1 ->> PG : acquireLock()
    end
    PG-->>T1: lock

    note right of T1: traitement métier

    T1 ->> PG: commit
    T1 --)- M: ack message

Cette séquence permet de limiter le nombre de requêtes en base de données si on lock souvent sur le même objet.

Dans le cas d’un traitement en parallèle, le deuxième thread va demander le lock et va attendre la fin du traitement du thread 1 pour récupérer le lock.

sequenceDiagram 
    actor M as messaging
    participant T1 as Thread 1
    participant PG as Postgresql
    participant T2 as Thread 2

    M -)+ T1: receive message
    T1 ->> PG : acquireLock()
    alt le lock n'existe pas en base de données
    PG --x T1: empty
    critical nouvelle Transaction
    T1 ->> PG: createLock()
    PG -->> T1: created
    end
    T1 ->> PG : acquireLock()
    end
    PG-->>T1: lock

    M -)+ T2: receive message
    T2 ->> PG: acquireLock()

    note right of T1: traitement métier

    T1 ->> PG: commit

    PG -->> T2: lock

    T1 --)- M: ack message

    note right of T2: traitement métier

    T2 --)- M: ack message

Le fonctionnement du SELECT FOR UPDATE, on indique à Postgresql qu’on va modifier une ligne dans la table. Et au commit de la transaction, la base de données va release le lock pour redonner la main au deuxième thread.

Limite de la gestion du lock

Ce système ne gère pas bien une concurrence trop importante, puisque nous bloquons le thread pour acquérir le lock. Ce qui provoque un ralentissement du temps de traitement, le thread attend au lieu de traiter un autre message.

Il faut gérer l’échec lors de la création d’un lock en concurrence.

sequenceDiagram 
    participant M as messaging
    participant T1 as Thread 1
    participant PG as Postgresql
    participant T2 as Thread 2

    M -)+ T1: receive message
    M -)+ T2: receive message
    T1 ->> PG : acquireLock()
    T2 ->> PG: acquireLock()
    PG --x T1: empty
    PG --x T2: empty
    
    note over T1,PG: new Transaction
    T1 ->> PG: createLock()
    note over T2,PG: new Transaction
    T2 ->> PG: createLock()
    PG -->> T1: created
    note over T1,PG: end Transaction
    
    PG --x T2: duplicateKey
    note over T2,PG: end Transaction
    
    T1 ->> PG : acquireLock()
    T2 ->> PG : acquireLock()
    PG-->>T1: lock

    note right of T1: traitement métier

    T1 ->> PG: commit

    PG -->> T2: lock

    T1 --)- M: ack message

    note right of T2: traitement métier

    T2 --)- M: ack message

Autre limite, on ne gère pas le cas d’un timeout lors de la récupération du lock.