Ramit Mittal

Leader elections with Postgres advisory locks

Modern container orchestration solutions like K8s make it very easy to run multiple instances of an application. If you’re running stateless API servers that simply respond to HTTP requests, you can scale out to your heart’s content. But certain applications cannot simply be scaled out. For example, an application that runs scheduled cron jobs. Ideally, you would want one instance to become the leader and run the cron job or distribute the tasks among all instances.

Electing leaders by locking resources

The easiest way to elect a leader among multiple instances is to make all of them share a resource (a disk or a database). Every instance attempts to obtain a lock on the resource. This could mean writing a file to disk or inserting a record in the database. The instance that succeeds becomes the leader. Redis is often used for obtaining locks and synchronizing operations.

Electing leaders with advisory locks

Running leader elections is very simple if your application is already using Postgres. The advisory lock feature of Postgres is very well-suited for this.

  • Choose an arbitrary ID, say 10.
  • All instances try to acquire a lock on this ID using the pg_try_advisory_lock() function.
  • The instance that succeeds becomes the leader.
  • All other instances retry to acquire the lock at a fixed interval.
  • The leader verifies that it is still leading by querying the lock at a fixed interval.
package main

import (
	"context"
	"log"
	"time"

	"github.com/jackc/pgx/v4/pgxpool"
)

var pool pgxpool.Pool

func leaderTasks(ctx context.Context) {
}

func runElectionLoop() {
	for {
		leaderStatusC := make(chan bool)
		go runElection(leaderStatusC)

		// if first message on channel is true then current instance is leader
		if isLeader := <-leaderStatusC; isLeader {
			// start interval based tasks when pod becomes leader
			ctx, cancel := context.WithCancel(context.TODO())
			leaderTasks(ctx)

			// second message on channel will be will be sent when current pod is no longer the leader
			<-leaderStatusC

			// cancel all interval based jobs when no longer leader
			cancel()
		}

		// wait for 1 minute before retrying election
		<-time.After(1 * time.Minute)
	}
}

func runElection(leaderStatusC chan<- bool) {
	ctx := context.TODO()

	defer func() {
		leaderStatusC <- false
	}()

	conn, err := pool.Acquire(ctx)
	if err != nil {
		log.Default().Printf("failed to acquire connection: %s", err.Error())
		return
	}
	defer conn.Release()

	acquireLockQ := "SELECT pg_try_advisory_lock(10)"

	var becameLeader bool
	if err := conn.QueryRow(ctx, acquireLockQ).Scan(&becameLeader); err != nil {
		log.Default().Printf("leader election failed with error: %s", err.Error())
		return
	}
	if !becameLeader {
		log.Default().Printf("leader election failed")
		return
	}

	log.Default().Printf("new leader")
	leaderStatusC <- true

	for {
		<-time.After(1 * time.Minute)

		ctx2, _ := context.WithTimeout(context.Background(), 5*time.Second)

		// ensure that an advisory lock is held on the ID 10 by the same connection as the one running this query
		checkLockQ := "SELECT count(*) FROM pg_locks WHERE pid = pg_backend_pid() AND locktype = 'advisory' AND objid = 10"

		var lockCount int
		lockCountErr := conn.QueryRow(ctx2, checkLockQ).Scan(&lockCount)
		if lockCount == 0 {
			log.Default().Printf("no longer leader: %s", lockCountErr.Error())
			break
		}
	}
}

Postgres will handle the lock expiration process for us. The lock will automatically be released if the connection holding the lock is closed, allowing another instance to become the new leader if the leader instance crashes or loses network connectivity. I have used the pgx library for the database connection pool in the example above. The code to connect with the database is not included in the sample. Every connection in a pool is treated by Postgres as a separate connection and will have a different backend PID. You must ensure that the same connection is used for checking the leader status every time.

Further reading