Ir para o conteúdo

Coordenação: Ratis

Ratis é um arcabouço de coordenação recentemente emancipado como um projeto no Apache. Embora mal documentado, o projeto tem alguns exemplos que demonstram como usar abstrações já implementadas. A seguir veremos um passo-a-passo, baseado nestes exemplos, de como usar o Ratis para implementar uma máquina de estados replicada.

Crie um novo projeto Maven com o nome ChaveValor (eu estou usando IntelliJ, mas as instruções devem ser semelhantes para Eclipse).

Novo Projeto Maven

Abra o arquivo pom.xml do seu projeto e adicione o seguinte trecho, com as dependências do projeto, incluindo o próprio Ratis.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.ratis/ratis-server -->
    <dependency>
        <groupId>org.apache.ratis</groupId>
        <artifactId>ratis-server</artifactId>
        <version>2.2.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.ratis/ratis-netty -->
    <dependency>
        <groupId>org.apache.ratis</groupId>
        <artifactId>ratis-netty</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.ratis</groupId>
        <artifactId>ratis-grpc</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>com.beust</groupId>
        <artifactId>jcommander</artifactId>
        <version>1.83</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-slf4j-impl -->
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.17.1</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-api</artifactId>
        <version>2.17.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-core</artifactId>
        <version>2.17.1</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

Adicione também o plugin Maven e o plugin para gerar um .jar com todas as dependências. Observe que estou usando Java 17, mas você pode mudar para a sua versão.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>${maven.compiler.version}</version>
            <configuration>
                <source>17</source>
                <target>17</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
        </plugin>
    </plugins>
</build>

Crie uma nova classe denominada Cliente no arquivo Cliente.java. Nesta classe, iremos criar um objeto RaftClient que será usado para enviar operações para os servidores. Esta classe é importada juntamente com outras várias dependências, adicionadas no pom.xml, que devemos instanciar antes do RaftClient.

Neste exemplo eu coloco praticamente todos os parâmetros de configuração do Ratis hardcoded para simplificar o código. Obviamente que voce deveria ler estes parâmetros como argumentos para o programa ou de um arquivo de configuração.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcFactory;
import org.apache.ratis.protocol.*;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;

public class Cliente
{

O campo raftGroupId identifica um cluster Ratis; isso quer dizer que um mesmo processo pode participar de vários clusters, mas aqui nos focaremos em apenas um. O valor do campo deve ter exatamente caracteres, o que soma 32 bytes em java, e será interpretado como um UUID.

id2addr é um mapa do identificador de cada processo no cluster para seu endereço IP + Porta. Aqui usei várias portas distintas porquê todos os processos estão rodando na mesma máquina, mas se estivesse executando em máquinas distintas, com IP distintos, poderia usar a mesma porta em todos.

addresses é uma lista de RaftPeer construída a parti de id2addr.

O campo raftGroup é uma referência a todos os servidores, associados ao identificador do grupo, raftGroupId.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
    public static void main(String args[]) throws IOException
    {
        String raftGroupId = "raft_group____um"; // 16 caracteres.

        Map<String,InetSocketAddress> id2addr = new HashMap<>();
        id2addr.put("p1", new InetSocketAddress("127.0.0.1", 3000));
        id2addr.put("p2", new InetSocketAddress("127.0.0.1", 3500));
        id2addr.put("p3", new InetSocketAddress("127.0.0.1", 4000));

        List<RaftPeer> addresses = id2addr.entrySet()
                .stream()
                .map(e -> RaftPeer.newBuilder().setId(e.getKey()).setAddress(e.getValue()).build())
                .collect(Collectors.toList());

        final RaftGroup raftGroup = RaftGroup.valueOf(RaftGroupId.valueOf(ByteString.copyFromUtf8(raftGroupId)), addresses);

Uma vez criado o grupo, criamos o cliente usando a fábrica retornada por RaftClient.newBuilder(). A fábrica deve ser configurada com os dados do grupo e o tipo de transporte, neste caso gRPC. Também é necessário o identificador do processo que está se conectando ao grupo; neste caso, usamos um identificador aleatório qualquer, diferente do que faremos com os servidores.

1
2
3
4
5
6
7
8
        RaftProperties raftProperties = new RaftProperties();

        RaftClient client = RaftClient.newBuilder()
                                      .setProperties(raftProperties)
                                      .setRaftGroup(raftGroup)
                                      .setClientRpc(new GrpcFactory(new Parameters())
                                      .newRaftClientRpc(ClientId.randomId(), raftProperties))
                                      .build();

Uma vez criado o cliente, podemos fazer invocações de operações nos servidores. Cada operação será invocada em todos os servidores, na mesma ordem. Este protótipo suporta duas operações, add e get, incluindo algumas variações, que ignoraremos por enquanto. A operação add é codificada como uma String, add:k:v, onde k e v são do tipo String. add:k:v adiciona uma entrada em um mapa implementado pelo nosso servidor com chave k e valor v. Já a operação get:k recupera o valor v associado à chave k, se presente no mapa.

O método RaftClient.io().send é usado para enviar modificações para as réplicas e deve, necessariamente, passar pelo protocolo Raft. Já o método RaftClient.io().sendReadOnly é usado para enviar consultas a qualquer das réplicas. Ambos os métodos codificam o comando sendo enviado (add:k:v ou get:k) no formato interno do Ratis para as réplicas e retorna um objeto RaftClientReply, que pode ser usado para pegar a resposta da operação. O código é auto explicativo.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
        RaftClientReply getValue;
        CompletableFuture<RaftClientReply> compGetValue;
        String response;
        switch (args[0]){
            case "add":
                getValue = client.io().send(Message.valueOf("add:" + args[1] + ":" + args[2]));
                response = getValue.getMessage().getContent().toString(Charset.defaultCharset());
                System.out.println("Resposta:" + response);
                break;
            case "get":
                getValue = client.io().sendReadOnly(Message.valueOf("get:" + args[1]));
                response = getValue.getMessage().getContent().toString(Charset.defaultCharset());
                System.out.println("Resposta:" + response);
                break;
            case "add_async":
                compGetValue = client.async().send(Message.valueOf("add:" + args[1] + ":" + args[2]));
                getValue = compGetValue.get();
                response = getValue.getMessage().getContent().toString(Charset.defaultCharset());
                System.out.println("Resposta: " + response);
                break;
            case "get_stale":
                getValue = client.io().sendStaleRead(Message.valueOf("get:" + args[1]), 0, RaftPeerId.valueOf(args[2]));
                response = getValue.getMessage().getContent().toString(Charset.defaultCharset());
                System.out.println("Resposta: " + response);
                break;
            default:
                System.out.println("comando inválido");
        }

        client.close();
    }
}

Um vez criado o cliente, crie a classe Servidor, no arquivo Servidor.java; a parte inicial do código é semelhante à do cliente.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class Servidor
{

    //Parametros: myId
    public static void main(String args[]) throws IOException, InterruptedException
    {
        String raftGroupId = "raft_group____um"; // 16 caracteres.

        //Setup for node all nodes.
        Map<String,InetSocketAddress> id2addr = new HashMap<>();
        id2addr.put("p1", new InetSocketAddress("127.0.0.1", 3000));
        id2addr.put("p2", new InetSocketAddress("127.0.0.1", 3500));
        id2addr.put("p3", new InetSocketAddress("127.0.0.1", 4000));

        List<RaftPeer> addresses = id2addr.entrySet()
                                          .stream()
                                          .map(e -> RaftPeer.newBuilder().setId(e.getKey()).setAddress(e.getValue()).build())
                                          .collect(Collectors.toList());

A primeira diferença vem na necessidade de identificar o servidor dentro do conjunto de servidores, o que é feito com um RaftPeerId. Como cada servidor deve usar um identificador único, do conjunto pré-determinado em id2addr, o identificador é passado como argumento para o programa, obrigatoriamente.

1
2
3
4
5
6
7
8
        //Setup for this node.
        RaftPeerId myId = RaftPeerId.valueOf(args[0]);

        if (addresses.stream().noneMatch(p -> p.getId().equals(myId)))
        {
            System.out.println("Identificador " + args[0] + " é inválido.");
            System.exit(1);
        }

Encare a seção seguinte como uma receita, mas observe que o método RaftServerConfigKeys.setStorageDir recebe o nome de uma pasta como argumento, que será usada para armazenar o estado da máquina de estados. Se você executar o servidor múltiplas vezes, a cada nova execução o estado anterior do sistema será recuperado desta pasta. Para limpar o estado, apague as pastas de cada servidor.

1
2
3
4
        RaftProperties properties = new RaftProperties();
        properties.setInt(GrpcConfigKeys.OutputStream.RETRY_TIMES_KEY, Integer.MAX_VALUE);
        GrpcConfigKeys.Server.setPort(properties, 1000);
        RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(new File("/tmp/" + myId)));

A máquina de estados em si é especificada no próximo excerto, em setStateMachine, que veremos a seguir.

1
2
3
4
5
6
7
8
9
        //Join the group of processes.
        final RaftGroup raftGroup = RaftGroup.valueOf(RaftGroupId.valueOf(ByteString.copyFromUtf8(raftGroupId)), id2addr);
        RaftServer raftServer = RaftServer.newBuilder()
                .setServerId(myId)
                .setStateMachine(new MaquinaDeEstados())
                .setProperties(properties)
                .setGroup(raftGroup)
                .build();
        raftServer.start();

Uma vez iniciado o servidor, basta esperar que ele termine antes de sair do programa.

1
2
3
4
5
        while(raftServer.getLifeCycleState() != LifeCycle.State.CLOSED) {
            TimeUnit.SECONDS.sleep(1);
        }
    }
}

Vamos agora para a definição da classe MaquinaDeEstados, no arquivo MaquinaDeEstados.java. Esta classe deve implementar a interface org.apache.ratis.statemachine.StateMachine e seus vários métodos ou, mais simples, estende org.apache.ratis.statemachine.impl.BaseStateMachine, a abordagem que usaremos aqui.

1
2
public class MaquinaDeEstados extends BaseStateMachine
{

Por enquanto, ignoraremos o armazenamento do estado em disco, mantendo-o simplesmente em memória no campo key2values, e simplesmente implementaremos o processamento de comandos, começando pela implementação do método query.

Este método é reponsável por implementar operações que não alteram o estado da máquina de estados, enviadas com o método RaftClient::sendReadOnly. A única query no nosso sistema é o get. No código, o conteúdo da requisição enviada pelo cliente deve ser recuperado em quebrado em operação (get) e chave , usando : como delimitador. Recuperado o valor associado à chave, o mesmo é colocado em um CompletableFuture e retornado.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    private final Map<String, String> key2values = new ConcurrentHashMap<>();

    @Override
    public CompletableFuture<Message> query(Message request) {
        final String[] opKey = request.getContent().toString(Charset.defaultCharset()).split(":");
        final String result = opKey[0]+ ":"+ key2values.get(opKey[1]);

        LOG.debug("{}: {} = {}", opKey[0], opKey[1], result);
        return CompletableFuture.completedFuture(Message.valueOf(result));
    }

O método applyTransaction implementa operações que alteram o estado, como add, enviadas com o método RaftClient::send. Da mesma forma que em get, a operação deve ser recuperada em quebrada em operação (add), chave e valor, usando : como delimitador.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
    @Override
    public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
        final RaftProtos.LogEntryProto entry = trx.getLogEntry();
        final String[] opKeyValue = entry.getStateMachineLogEntry().getLogData().toString(Charset.defaultCharset()).split(":");

        final String result = opKeyValue[0]+ ":"+ key2values.put(opKeyValue[1], opKeyValue[2]);

        final CompletableFuture<Message> f = CompletableFuture.completedFuture(Message.valueOf(result));

        final RaftProtos.RaftPeerRole role = trx.getServerRole();
        LOG.info("{}:{} {} {}={}", role, getId(), opKeyValue[0], opKeyValue[1], opKeyValue[2]);

        return f;
    }

Pronto, você já tem uma máquina de estados replicada, bastando agora apenas compilá-la e executá-la. Para compilar, de raiz do projeto execute o comando mvn package. A primeira vez que faz isso pode demorar um pouco pois várias dependências são baixadas da Internet. Ao final da execução do comando você deveria ver algo semelhante ao seguinte

1
2
3
4
5
6
7
...
INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  4.793 s
[INFO] Finished at: 2020-12-06T23:06:32-03:00
[INFO] ------------------------------------------------------------------------

Então, em três terminais diferentes, execute os seguintes comandos:

1
2
3
java -cp target/ChaveValor-1.0-SNAPSHOT-jar-with-dependencies.jar Servidor p1
java -cp target/ChaveValor-1.0-SNAPSHOT-jar-with-dependencies.jar Servidor p2
java -cp target/ChaveValor-1.0-SNAPSHOT-jar-with-dependencies.jar Servidor p3

Para executar o cliente, em um outro terminal, faça, por exemplo,

1
2
3
4
java -cp target/ChaveValor-1.0-SNAPSHOT-jar-with-dependencies.jar Cliente add k1 testek1
java -cp target/ChaveValor-1.0-SNAPSHOT-jar-with-dependencies.jar Cliente get k1

java -cp target/ChaveValor-1.0-SNAPSHOT-jar-with-dependencies.jar Cliente add k2 testek2

Todo o código está disponível no Github

Exercício
  • Adicionar operações
    • del
    • clear
Operações assíncronas
TODO
  • Operações assíncronas usando async() em vez de io().
  • CompletableFuture
Leituras "velhas"
TODO
  • stale reads usando sendStaleRead em vez de sendRead.
  • índice inicial
  • java -cp target/ChaveValor-1.0-SNAPSHOT-jar-with-dependencies.jar Cliente get_stale k1 p1