5

Captura de datos con Postgres y Elixir

 1 year ago
source link: https://www.erlang-solutions.com/blog/captura-de-datos-con-postgres-y-elixir/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Captura de datos con Postgres y Elixir

La captura de datos es el proceso de identificar y capturar cambios de datos en la base de datos.

Con captura de datos, los cambios en los datos pueden ser rastreados casi en tiempo real, y esa información puede ser utilizada para apoyar una variedad de casos de uso, incluyendo auditoría, replicación y sincronización.

Read the English version.

La captura de datos es el proceso de identificar y capturar cambios de datos en la base de datos.

Con captura de datos, los cambios en los datos pueden ser rastreados casi en tiempo real, y esa información puede ser utilizada para apoyar una variedad de casos de uso, incluyendo auditoría, replicación y sincronización.

Un buen ejemplo de un caso de uso para captura de datos es considerar una aplicación que inserta un registro en la base de datos y envía un evento a una cola de mensajes después de que se ha insertado el registro (escribir dos veces).

Imagina que estás trabajando en una aplicación de comercio electrónico y después de que se crea y se inserta un pedido en la base de datos, se envía un evento OrderCreated a una cola de mensajes. Los consumidores del evento podrían hacer cosas como crear órdenes de recolección para el almacén, programar transportes para la entrega y enviar un correo electrónico de confirmación del pedido al cliente.

Pero ¿qué sucede si la aplicación se bloquea después de que se ha insertado el pedido en la base de datos pero antes de lograr enviar el evento a la cola de mensajes? Esto es posible debido al hecho de que no se puede insertar atómicamente el registro Y enviar el mensaje en la misma transacción, por lo que si la aplicación se bloquea después de insertar el registro en la base de datos pero antes de enviar el evento a la cola, se pierde el evento.

Por supuesto, existen soluciones alternativas para evitar esto: una solución simple es “almacenar” el evento en una tabla de almacenamiento temporal en la misma transacción en la que se escribe el registro, y luego depender de un proceso captura de datos para capturar el cambio en la tabla de almacenamiento y enviar el evento a la cola de mensajes. La transacción es atómica y el proceso de captura de datos puede asegurar que el evento se entregue al menos una vez.

Para capturar cambios, la captura de datos típicamente utiliza uno de dos métodos: basado en registro o basado en disparadores.

La captura de datos basado en registro implica leer los registros de transacciones de la base de datos para identificar los cambios de datos, que es el método que utilizaremos aquí al utilizar la replicación lógica de Postgres.

Replicación de Postgres

Hay dos modos de replicación en Postgres:

  1. Replicación física: cada cambio del primario se transmite a las réplicas a través del WAL (Write Ahead Log). Esta replicación se realiza byte por byte con direcciones de bloque exactas.
  1. Replicación lógica: en la replicación lógica, el suscriptor recibe cada cambio de transacción individual (es decir, declaraciones INSERT, UPDATE o DELETE) en la base de datos.

El WAL todavía se transmite, pero codifica las operaciones lógicas para que puedan ser decodificadas por el suscriptor sin tener que conocer los detalles internos de Postgres.

Una de las grandes ventajas de la replicación lógica es que se puede utilizar para replicar sólo tablas o filas específicas, lo que significa que se tiene un control completo sobre lo que se está replicando.

Para habilitar la replicación lógica, el wal_level debe ser configurado:

-- determines how much information is written to the wal. 
-- Each 'level' inherits the level below it; 'logical' is the highest level

ALTER SYSTEM SET wal_level=logical;

-- simultaneously running WAL sender processes
ALTER SYSTEM SET max_wal_senders='10';

-- simultaneously defined replication slots
ALTER SYSTEM SET max_replication_slots='10';

Los cambios requieren un reinicio de la instancia de Postgres.

Después de reiniciar el sistema, el wal_level se puede verificar con:

SHOW wal_level;
 wal_level 
-----------
 logical
(1 row)

Para suscribirse a los cambios se debe crear una publicación. Una publicación es un grupo de tablas en las que nos gustaría recibir cambios de datos.

Vamos a crear una tabla simple y definir una publicación para ella:

CREATE TABLE articles (id serial PRIMARY KEY, title text, description text, body text);
CREATE PUBLICATION articles_pub FOR TABLE articles;

Para indicar a Postgres que retenga segmentos de WAL, debemos crear un slot de replicación.

El slot de replicación representa un flujo de cambios desde una o más publicaciones y se utiliza para prevenir la pérdida de datos en caso de una falla del servidor, ya que son a prueba de fallos.

Protocolo de Replicación

Para tener una idea del protocolo y los mensajes que se envían, podemos usar pg_recvlogical para iniciar un suscriptor de replicación:

# Start and use the publication defined above
# output is written to stdout
pg_recvlogical --start \
  --host='localhost' \
  --port='5432' \
  --username='postgres' \
  --dbname='postgres' \
  --option=publication_names='articles_pub' \
  --option=proto_version=1 \
  --create-slot \
  --if-not-exists \
  --slot=articles_slot \
  --plugin=pgoutput \
  --file=-

Insertar un registro:

INSERT INTO articles (title, description, body)
    VALUES ('Postgres replication', 'Using logical replication', 'Foo bar baz');

Cada linea en la salida corresponde a un mensaje de replicación recibido a través de suscripción:

B(egin) - Begin transaction 
R(elation) - Table, schema, columns and their types
I(insert) - Data being inserted
C(ommit) - Commit transaction

___________________________________

B

Rarticlesdidtitledescriptionbody
It35tPostgres replicationtUsing logical replicationtFoo bar baz
C

Si insertamos múltiples registros en una transacción deberíamos tener dos I entre B y C:

BEGIN;
INSERT INTO articles (title, description, body) VALUES ('First', 'desc', 'Foo');

INSERT INTO articles (title, description, body) VALUES ('Second', 'desc', 'Bar');
COMMIT;

Y la salida:

C
B

It37tFirsttdesctFoo
It38tSecondtdesctBar
CCopied to clipboard!

La información de la relación, es decir, la tabla, no se transmitió porque ya se recibió la relación al insertar el primer registro.

Postgres solo envía la relación la primera vez que se encuentra durante la sesión. Se espera que el suscriptor almacene en caché una relación previamente enviada.

Ahora que tenemos una idea de cómo funciona la replicación lógica, ¡implementémosla en Elixir!

Implementando la conexión de replicación

Cree un nuevo proyecto de Elixir:

mix new cdc

Añadiremos las siguientes dependencias a mix.exs:

defp deps do
  {:postgrex, "~> 0.16.4"},
  # decode/encode replication messages
  {:postgrex_pgoutput, "~> 0.1.0"}
end

Postgrex admite la replicación a través del proceso Postgrex.ReplicationConnection.

defmodule CDC.Replication do
  use Postgrex.ReplicationConnection
  require Logger

  defstruct [
    :publications,
    :slot,
    :state
  ]

  def start_link(opts) do
    conn_opts = [auto_reconnect: true]
    publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
    slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"

    Postgrex.ReplicationConnection.start_link(
      __MODULE__,
      {slot, publications},
      conn_opts ++ opts
    )
  end

  @impl true
  def init({slot, pubs}) do
    {:ok, %__MODULE__{slot: slot, publications: pubs}}
  end

  
  @impl true
  def handle_connect(%__MODULE__{slot: slot} = state) do
    query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"

    Logger.debug("[create slot] query=#{query}")

    {:query, query, %{state | state: :create_slot}}
  end

  @impl true
  def handle_result([%Postgrex.Result{} | _], %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state) do
    opts = [proto_version: 1, publication_names: pubs]

    query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"

    Logger.debug("[start streaming] query=#{query}")

    {:stream, query, [], %{state | state: :streaming}}
  end

  @impl true
  def handle_data(msg, state) do
    Logger.debug("Received msg=#{inspect(msg, limit: :infinity, pretty: true)}")
    {:noreply, [], state}
  end

  defp escape_options([]),
    do: ""

  defp escape_options(opts) do
    parts =
      Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)

    [?\s, ?(, parts, ?)]
  end

  defp escape_string(value) do
    [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
  end
end

El código esta disponible en GitHub

Probemos:

opts = [
  slot: "articles_slot_elixir",
  publications: ["articles_pub"],
  host: "localhost",
  database: "postgres",
  username: "postgres",
  password: "postgres",
  port: 5432,
]

CDC.Replication.start_link(opts)

Cuando iniciamos el proceso, ocurre lo siguiente:

  1. Una vez que estamos conectados a Postgres, se llama al callback handle_connect/1, se crea un slot de replicación lógica temporal.
  2. Se llama a handle_result/2 con el resultado de la consulta en el paso 1. Si el slot se creó correctamente, comenzamos a transmitir desde el slot y entramos en el modo de transmisión. La posición solicitada ‘0/0’ significa que Postgres elige la posición.
  3. Cualquier mensaje de replicación enviado desde Postgres se recibe en el callback handle_data/2.

Mensajes de replicación

Hay dos tipos de mensajes que un suscriptor recibe:

  1. primary_keep_alive: un mensaje de comprobación, si reply == 1 se espera que el suscriptor responda al mensaje con un standby_status_update para evitar una desconexión por tiempo de espera.

El standby_status_update contiene el LSN actual que el suscriptor ha procesado.

Postgres utiliza este mensaje para determinar qué segmentos de WAL se pueden eliminar de forma segura.

  1. xlog_data: contiene los mensajes de datos para cada paso en una transacción.Dado que no estamos respondiendo a los mensajes primary_keep_alive, el proceso se desconecta y se reinicia.

Arreglemos esto decodificando los mensajes y comenzando a responder con mensajes standby_status_update.


defmodule CDC.Replication do
  use Postgrex.ReplicationConnection

  require Postgrex.PgOutput.Messages
  alias Postgrex.PgOutput.{Messages, Lsn}

  require Logger

  defstruct [
    :publications,
    :slot,
    :state
  ]

  def start_link(opts) do
    conn_opts = [auto_reconnect: true]
    publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
    slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"

    Postgrex.ReplicationConnection.start_link(
      __MODULE__,
      {slot, publications},
      conn_opts ++ opts
    )
  end

  @impl true
  def init({slot, pubs}) do
    {:ok, %__MODULE__{slot: slot, publications: pubs}}
  end

  @impl true
  def handle_connect(%__MODULE__{slot: slot} = state) do
    query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"

    Logger.debug("[create slot] query=#{query}")

    {:query, query, %{state | state: :create_slot}}
  end

  @impl true
  def handle_result(
        [%Postgrex.Result{} | _],
        %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
      ) do
    opts = [proto_version: 1, publication_names: pubs]

    query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"

    Logger.debug("[start streaming] query=#{query}")

    {:stream, query, [], %{state | state: :streaming}}
  end

  @impl true
  def handle_data(msg, state) do
    return_msgs =
      msg
      |> Messages.decode()
      |> handle_msg()

    {:noreply, return_msgs, state}
  end

  #
  defp handle_msg(Messages.msg_primary_keep_alive(server_wal: lsn, reply: 1)) do
    Logger.debug("msg_primary_keep_alive message reply=true")
    <<lsn::64>> = Lsn.encode(lsn)

    [standby_status_update(lsn)]
  end

  defp handle_msg(Messages.msg_primary_keep_alive(reply: 0)), do: []

  defp handle_msg(Messages.msg_xlog_data(data: data)) do
    Logger.debug("xlog_data message: #{inspect(data, pretty: true)}")
    []
  end

  defp standby_status_update(lsn) do
    [
      wal_recv: lsn + 1,
      wal_flush: lsn + 1,
      wal_apply: lsn + 1,
      system_clock: Messages.now(),
      reply: 0
    ]
    |> Messages.msg_standby_status_update()
    |> Messages.encode()
  end

  
defp escape_options([]),
    do: ""

  defp escape_options(opts) do
    parts =
      Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)

    [?\s, ?(, parts, ?)]
  end

  defp escape_string(value) do
    [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
  end
end

handle_data/2 decodifica el mensaje y lo pasa a handle_msg/1. Si es un primary_keep_alive, respondemos con un standby_status_update.

El LSN denota una posición de byte en el WAL.

El suscriptor responde con el LSN que ha manejado actualmente, como no estamos haciendo seguimiento de los mensajes que recibimos, simplemente confirmamos con el LSN enviado desde el servidor.

A continuación, manejaremos los mensajes xlog_data, la idea aquí es que capturaremos cada operación en una estructura de transacción.

Capturando transacciones

El módulo CDC.Protocol manejará los mensajes xlog_data y rastreará el estado de la transacción

defmodule CDC.Protocol do
  import Postgrex.PgOutput.Messages
  require Logger

  alias CDC.Tx
  alias Postgrex.PgOutput.Lsn

  
@type t :: %__MODULE__{
          tx: Tx.t(),
          relations: map()
        }

  defstruct [
    :tx,
    relations: %{}
  ]

  @spec new() :: t()
  def new do
    %__MODULE__{}
  end

  def handle_message(msg, state) when is_binary(msg) do
    msg
    |> decode()
    |> handle_message(state)
  end

  def handle_message(msg_primary_keep_alive(reply: 0), state), do: {[], nil, state}
  def handle_message(msg_primary_keep_alive(server_wal: lsn, reply: 1), state) do
    Logger.debug("msg_primary_keep_alive message reply=true")
    <<lsn::64>> = Lsn.encode(lsn)

    {[standby_status_update(lsn)], nil, state}
  end

  def handle_message(msg, %__MODULE__{tx: nil, relations: relations} = state) do
    tx =
      [relations: relations, decode: true]
      |> Tx.new()
      |> Tx.build(msg)

    {[], nil, %{state | tx: tx}}
  end

  def handle_message(msg, %__MODULE__{tx: tx} = state) do
    case Tx.build(tx, msg) do
      %Tx{state: :commit, relations: relations} ->
        tx = Tx.finalize(tx)
        relations = Map.merge(state.relations, relations)
        {[], tx, %{state | tx: nil, relations: relations}}

      tx ->
        {[], nil, %{state | tx: tx}}
    end
  end

  defp standby_status_update(lsn) do
    [
      wal_recv: lsn + 1,
      wal_flush: lsn + 1,
      wal_apply: lsn + 1,
      system_clock: now(),
      reply: 0
    ]
    |> msg_standby_status_update()
    |> encode()
  end
end

CDC.Tx maneja mensajes recibidos dentro de la transacción, begin, relation, insert/update/delete y commit.

defmodule CDC.Tx do
  import Postgrex.PgOutput.Messages
  alias Postgrex.PgOutput.Lsn

  alias __MODULE__.Operation

  @type t :: %__MODULE__{
          operations: [Operation.t()],
          relations: map(),
          timestamp: term(),
          xid: pos_integer(),
          state: :begin | :commit,
          lsn: Lsn.t(),
          end_lsn: Lsn.t()
        }

  defstruct [
    :timestamp,
    :xid,
    :lsn,
    :end_lsn,
    relations: %{},
    operations: [],
    state: :begin,
    decode: true
  ]

  def new(opts \\ []) do
    struct(__MODULE__, opts)
  end

  def finalize(%__MODULE__{state: :commit, operations: ops} = tx) do
    %{tx | operations: Enum.reverse(ops)}
  end

  def finalize(%__MODULE__{} = tx), do: tx

  @spec build(t(), tuple()) :: t()
  def build(tx, msg_xlog_data(data: data)) do
    build(tx, data)
  end

  def build(tx, msg_begin(lsn: lsn, timestamp: ts, xid: xid)) do
    %{tx | lsn: lsn, timestamp: ts, xid: xid, state: :begin}
  end

  def build(%__MODULE__{state: :begin, relations: relations} = tx, msg_relation(id: id) = rel) do
    %{tx | relations: Map.put(relations, id, rel)}
  end

  def build(%__MODULE__{state: :begin, lsn: tx_lsn} = tx, msg_commit(lsn: lsn, end_lsn: end_lsn))
      when tx_lsn == lsn do
    %{tx | state: :commit, end_lsn: end_lsn}
  end

  def build(%__MODULE__{state: :begin} = builder, msg_insert(relation_id: id) = msg),
    do: build_op(builder, id, msg)

  def build(%__MODULE__{state: :begin} = builder, msg_update(relation_id: id) = msg),
    do: build_op(builder, id, msg)

  def build(%__MODULE__{state: :begin} = builder, msg_delete(relation_id: id) = msg),
    do: build_op(builder, id, msg)

  # skip unknown messages
  def build(%__MODULE__{} = tx, _msg), do: tx

  defp build_op(%__MODULE__{state: :begin, relations: rels, decode: decode} = tx, id, msg) do
    rel = Map.fetch!(rels, id)
    op = Operation.from_msg(msg, rel, decode)

    %{tx | operations: [op | tx.operations]}
  end
end

CDC.Tx.Operation maneja los mensajes INSERT/UPDATE/DELETE y decodifica los datos combinándolos con la relación

defmodule CDC.Tx.Operation do
  @moduledoc "Describes a change (INSERT, UPDATE, DELETE) within a transaction."

  import Postgrex.PgOutput.Messages
  alias Postgrex.PgOutput.Type, as: PgType

  @type t :: %__MODULE__{}
  defstruct [
    :type,
    :schema,
    :namespace,
    :table,
    :record,
    :old_record,
    :timestamp
  ]

  @spec from_msg(tuple(), tuple(), decode :: boolean()) :: t()
  def from_msg(
        msg_insert(data: data),
        msg_relation(columns: columns, namespace: ns, name: name),
        decode?
      ) do
    %__MODULE__{
      type: :insert,
      namespace: ns,
      schema: into_schema(columns),
      table: name,
      record: cast(data, columns, decode?),
      old_record: %{}
    }
  end

  def from_msg(
        msg_update(change_data: data, old_data: old_data),
        msg_relation(columns: columns, namespace: ns, name: name),
        decode?
      ) do
    %__MODULE__{
      type: :update,
      namespace: ns,
      table: name,
      schema: into_schema(columns),
      record: cast(data, columns, decode?),
      old_record: cast(columns, old_data, decode?)
    }
  end

  def from_msg(
        msg_delete(old_data: data),
        msg_relation(columns: columns, namespace: ns, name: name),
        decode?
      ) do
    %__MODULE__{
      type: :delete,
      namespace: ns,
      schema: into_schema(columns),
      table: name,
      record: %{},
      old_record: cast(data, columns, decode?)
    }
  end

  defp into_schema(columns) do
    for c <- columns do
      c
      |> column()
      |> Enum.into(%{})
    end
  end

  defp cast(data, columns, decode?) do
    Enum.zip_reduce([data, columns], %{}, fn [text, typeinfo], acc ->
      key = column(typeinfo, :name)

      value =
        if decode? do
          t =
            typeinfo
            |> column(:type)
            |> PgType.type_info()

          PgType.decode(text, t)
        else
          text
        end

      Map.put(acc, key, value)
    end)
  end
end

Como antes, el mensaje primary_keep_alive con reply == 1 envía un standby_status_update. Cuando recibimos un mensaje xlog_data, creamos un nuevo %Tx{} que usamos para “construir” la transacción hasta que recibimos un msg_commit que marca el final de la transacción.

Cualquier mensaje de inserción, actualización o eliminación crea una CDC.Tx.Operation en la transacción, cada operación contiene un relation_id que se utiliza para buscar la relación desde tx.relations.

La operación junto con la relación nos permite decodificar los datos. La información de columna y tipo se recupera de la relación y se utiliza para decodificar los valores en términos de Elixir.

Una vez que estamos en un estado de commit, fusionamos Tx.relations con Protocol.relations, ya que un mensaje de relación sólo se transmitirá la primera vez que se encuentre una tabla durante la sesión de conexión, Protocol.relations contiene todos los msg_relation que se nos han enviado durante la sesión.

El módulo CDC.Replication ahora se ve así:

defmodule CDC.Replication do
  use Postgrex.ReplicationConnection

  alias CDC.Protocol

  require Logger

  defstruct [
    :publications,
    :protocol,
    :slot,
    :state
  ]

  def start_link(opts) do
    conn_opts = [auto_reconnect: true]
    publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
    slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"

    Postgrex.ReplicationConnection.start_link(
      __MODULE__,
      {slot, publications},
      conn_opts ++ opts
    )
  end

  @impl true
  def init({slot, pubs}) do
    {:ok,
     %__MODULE__{
       slot: slot,
       publications: pubs,
       protocol: Protocol.new()
     }}
  end

  @impl true
  def handle_connect(%__MODULE__{slot: slot} = state) do
    query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"

    Logger.debug("[create slot] query=#{query}")

    {:query, query, %{state | state: :create_slot}}
  end

  @impl true
  def handle_result(
        [%Postgrex.Result{} | _],
        %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
      ) do
    opts = [proto_version: 1, publication_names: pubs]

    query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"

    Logger.debug("[start streaming] query=#{query}")

    {:stream, query, [], %{state | state: :streaming}}
  end

  @impl true
  def handle_data(msg, state) do
    {return_msgs, tx, protocol} = Protocol.handle_message(msg, state.protocol)

    if not is_nil(tx) do
      Logger.debug("Tx: #{inspect(tx, pretty: true)}")
    end

    {:noreply, return_msgs, %{state | protocol: protocol}}
  end

  
defp escape_options([]),
    do: ""

  defp escape_options(opts) do
    parts =
      Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)

    [?\s, ?(, parts, ?)]
  end

  defp escape_string(value) do
    [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
  end
end

handle_data/2 llama a Protocol.handle_message/1 que devuelve una tupla con tres elementos {messages_to_send :: [binary()], complete_transaction :: CDC.Tx.t() | nil, CDC.Protocolo.t()}

Por ahora solo inspeccionamos la transacción cuando se emite desde Protocol.handle_message/3, probémoslo:

Interactive Elixir (1.14.0) - press Ctrl+C to exit (type h() ENTER for help)
opts = [
  slot: "articles_slot_elixir",
  publications: ["articles_pub"],
  host: "localhost",
  database: "postgres",
  username: "postgres",
  password: "postgres",
  port: 5432,
]

{:ok, _} = CDC.Replication.start_link(opts)
{:ok, pid} = Postgrex.start_link(opts)

insert_query = """
INSERT INTO articles (title, description, body) 
VALUES ('Postgres replication', 'Using logical replication', 'with Elixir!')
"""

_ = Postgrex.query!(pid, insert_query, [])
  
14:03:48.020 [debug] Tx: %CDC.Tx{
  timestamp: ~U[2022-10-31 13:03:48Z],
  xid: 494,
  lsn: {0, 22981920},
  end_lsn: nil,
  relations: %{
    16386 => {:msg_relation, 16386, "public", "articles", :default,
     [
       {:column, [:key], "id", :int4, -1},
       {:column, [], "title", :text, -1},
       {:column, [], "description", :text, -1},
       {:column, [], "body", :text, -1}
     ]}
  },
  operations: [
    %CDC.Tx.Operation{
      type: :insert,
      schema: [
        %{flags: [:key], modifier: -1, name: "id", type: :int4},
        %{flags: [], modifier: -1, name: "title", type: :text},
        %{flags: [], modifier: -1, name: "description", type: :text},
        %{flags: [], modifier: -1, name: "body", type: :text}
      ],
      namespace: "public",
      table: "articles",
      record: %{
        "body" => "with Elixir!",
        "description" => "Using logical replication",
        "id" => 6,
        "title" => "Postgres replication"
      },
      old_record: %{},
      timestamp: nil
    }
  ],
  state: :begin,
  decode: true
}

Cada cambio en la transacción se almacena en Tx.operations, operation.record es la fila decodificada como un mapa.

Finalmente, implementemos una forma de suscribirnos a los cambios de CDC.Replication:

defmodule CDC.Replication do
  use Postgrex.ReplicationConnection

  alias CDC.Protocol

  require Logger

  defstruct [
    :publications,
    :protocol,
    :slot,
    :state,
    subscribers: %{}
  ]

  def start_link(opts) do
    conn_opts = [auto_reconnect: true]
    publications = opts[:publications] || raise ArgumentError, message: "`:publications` is required"
    slot = opts[:slot] || raise ArgumentError, message: "`:slot` is required"

    Postgrex.ReplicationConnection.start_link(
      __MODULE__,
      {slot, publications},
      conn_opts ++ opts
    )
  end

  def subscribe(pid, opts \\ []) do
    Postgrex.ReplicationConnection.call(pid, :subscribe, Keyword.get(opts, :timeout, 5_000))
  end

  def unsubscribe(pid, ref, opts \\ []) do
    Postgrex.ReplicationConnection.call(
      pid,
      {:unsubscribe, ref},
      Keyword.get(opts, :timeout, 5_000)
    )
  end

  @impl true
  def init({slot, pubs}) do
    {:ok,
     %__MODULE__{
       slot: slot,
       publications: pubs,
       protocol: Protocol.new()
     }}
  end

  @impl true
  def handle_connect(%__MODULE__{slot: slot} = state) do
    query = "CREATE_REPLICATION_SLOT #{slot} TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"

    Logger.debug("[create slot] query=#{query}")

    {:query, query, %{state | state: :create_slot}}
  end

  @impl true
  def handle_result(
        [%Postgrex.Result{} | _],
        %__MODULE__{state: :create_slot, publications: pubs, slot: slot} = state
      ) do
    opts = [proto_version: 1, publication_names: pubs]

    query = "START_REPLICATION SLOT #{slot} LOGICAL 0/0 #{escape_options(opts)}"

    Logger.debug("[start streaming] query=#{query}")

    {:stream, query, [], %{state | state: :streaming}}
  end

  @impl true
  def handle_data(msg, state) do
    {return_msgs, tx, protocol} = Protocol.handle_message(msg, state.protocol)

    if not is_nil(tx) do
      notify(tx, state.subscribers)
    end

    {:noreply, return_msgs, %{state | protocol: protocol}}
  end

  # Replies must be sent using `reply/2`
  # https://hexdocs.pm/postgrex/Postgrex.ReplicationConnection.html#reply/2
  @impl true
  def handle_call(:subscribe, {pid, _} = from, state) do
    ref = Process.monitor(pid)

    state = put_in(state.subscribers[ref], pid)

    Postgrex.ReplicationConnection.reply(from, {:ok, ref})

    {:noreply, state}
  end

  def handle_call({:unsubscribe, ref}, from, state) do
    {reply, new_state} =
      case state.subscribers do
        %{^ref => _pid} ->
          Process.demonitor(ref, [:flush])

          {_, state} = pop_in(state.subscribers[ref])
          {:ok, state}

        _ ->
          {:error, state}
      end

    from && Postgrex.ReplicationConnection.reply(from, reply)

    {:noreply, new_state}
  end

  @impl true
  def handle_info({:DOWN, ref, :process, _, _}, state) do
    handle_call({:unsubscribe, ref}, nil, state)
  end

  defp notify(tx, subscribers) do
    for {ref, pid} <- subscribers do
      send(pid, {:notification, self(), ref, tx})
    end

    :ok
  end

  defp escape_options([]),
    do: ""

  defp escape_options(opts) do
    parts =
      Enum.map_intersperse(opts, ", ", fn {k, v} -> [Atom.to_string(k), ?\s, escape_string(v)] end)

    [?\s, ?(, parts, ?)]
  end

  defp escape_string(value) do
    [?', :binary.replace(to_string(value), "'", "''", [:global]), ?']
  end
end

Y lo podemos usar así:

opts = [
  slot: "articles_slot",
  publications: ["articles_pub"],
  host: "localhost",
  database: "postgres",
  username: "postgres",
  password: "postgres",
  port: 5432,
]

{:ok, pid} = CDC.Replication.start_link(opts)
{:ok, pg_pid} = Postgrex.start_link(opts)
{:ok, ref} = CDC.Replication.subscribe(pid)

insert_query = """
INSERT INTO articles (title, description, body) 
VALUES ('Postgres replication', 'Using logical replication', 'with Elixir!')
"""

_ = Postgrex.query!(pg_pid, insert_query, [])
flush()

{:notification, #PID<0.266.0>, #Reference<0.2499916608.3416784901.94813>,
 %CDC.Tx{
   timestamp: ~U[2022-10-31 13:26:35Z],
   xid: 495,
   lsn: {0, 22983536},
   end_lsn: nil,
   relations: %{
     16386 => {:msg_relation, 16386, "public", "articles", :default,
      [
        {:column, [:key], "id", :int4, -1},
        {:column, [], "title", :text, -1},
        {:column, [], "description", :text, -1},
        {:column, [], "body", :text, -1}
      ]}
   },
   operations: [
     %CDC.Tx.Operation{
       type: :insert,
       schema: [
         %{flags: [:key], modifier: -1, name: "id", type: :int4},
         %{flags: [], modifier: -1, name: "title", type: :text},
         %{flags: [], modifier: -1, name: "description", type: :text},
         %{flags: [], modifier: -1, name: "body", type: :text}
       ],
       namespace: "public",
       table: "articles",
       record: %{
         "body" => "with Elixir!",
         "description" => "Using logical replication",
         "id" => 7,
         "title" => "Postgres replication"
       },
       old_record: %{},
       timestamp: nil
     }
   ],
   state: :begin,
   decode: true
 }}

Conclusión

Si está buscando una manera de capturar cambios de su base de datos con cambios mínimos en su configuración existente, definitivamente vale la pena considerar Cambiar la captura de datos. Con Elixir y postgrex hemos implementado un mini Debezium en ~400 LOC. La fuente completa está disponible aquí.

Si necesita ayuda con la implementación de Elixir, nuestro equipo de expertos líder en el mundo siempre está aquí para ayudarlo. Contáctenos hoy para saber cómo podemos ayudarlo.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK