Coordenação: Ratis
Ratis é um arcabouço de coordenação recentemente emancipado como um projeto no Apache.
Embora mal documentado, o projeto tem alguns exemplos que demonstram como usar abstrações já implementadas.
A seguir veremos um passo-a-passo, baseado nestes exemplos, de como usar o Ratis para implementar uma máquina de estados replicada.
Crie um novo projeto Maven com o nome ChaveValor
(eu estou usando IntelliJ, mas as instruções devem ser semelhantes para Eclipse).
Abra o arquivo pom.xml
do seu projeto e adicione o seguinte trecho, com as dependências do projeto, incluindo o próprio Ratis.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 | <dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.ratis/ratis-server -->
<dependency>
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-server</artifactId>
<version>2.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.ratis/ratis-netty -->
<dependency>
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-netty</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-grpc</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<version>1.83</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-slf4j-impl -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
|
Adicione também o plugin Maven e o plugin para gerar um .jar
com todas as dependências. Observe que estou usando Java 17, mas você pode mudar para a sua versão.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | <build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.compiler.version}</version>
<configuration>
<source>17</source>
<target>17</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
|
Crie uma nova classe denominada Cliente
no arquivo Cliente.java
.
Nesta classe, iremos criar um objeto RaftClient
que será usado para enviar operações para os servidores.
Esta classe é importada juntamente com outras várias dependências, adicionadas no pom.xml
, que devemos instanciar antes do RaftClient
.
Neste exemplo eu coloco praticamente todos os parâmetros de configuração do Ratis hardcoded para simplificar o código.
Obviamente que voce deveria ler estes parâmetros como argumentos para o programa ou de um arquivo de configuração.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcFactory;
import org.apache.ratis.protocol.*;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
public class Cliente
{
|
O campo raftGroupId
identifica um cluster Ratis; isso quer dizer que um mesmo processo pode participar de vários clusters, mas aqui nos focaremos em apenas um. O valor do campo deve ter exatamente caracteres, o que soma 32 bytes em java, e será interpretado como um UUID.
id2addr
é um mapa do identificador de cada processo no cluster para seu endereço IP + Porta.
Aqui usei várias portas distintas porquê todos os processos estão rodando na mesma máquina, mas se estivesse executando em máquinas distintas, com IP distintos, poderia usar a mesma porta em todos.
addresses
é uma lista de RaftPeer
construída a parti de id2addr
.
O campo raftGroup
é uma referência a todos os servidores, associados ao identificador do grupo, raftGroupId
.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | public static void main(String args[]) throws IOException
{
String raftGroupId = "raft_group____um"; // 16 caracteres.
Map<String,InetSocketAddress> id2addr = new HashMap<>();
id2addr.put("p1", new InetSocketAddress("127.0.0.1", 3000));
id2addr.put("p2", new InetSocketAddress("127.0.0.1", 3500));
id2addr.put("p3", new InetSocketAddress("127.0.0.1", 4000));
List<RaftPeer> addresses = id2addr.entrySet()
.stream()
.map(e -> RaftPeer.newBuilder().setId(e.getKey()).setAddress(e.getValue()).build())
.collect(Collectors.toList());
final RaftGroup raftGroup = RaftGroup.valueOf(RaftGroupId.valueOf(ByteString.copyFromUtf8(raftGroupId)), addresses);
|
Uma vez criado o grupo, criamos o cliente usando a fábrica retornada por RaftClient.newBuilder()
.
A fábrica deve ser configurada com os dados do grupo e o tipo de transporte, neste caso gRPC.
Também é necessário o identificador do processo que está se conectando ao grupo; neste caso, usamos um identificador aleatório qualquer, diferente do que faremos com os servidores.
| RaftProperties raftProperties = new RaftProperties();
RaftClient client = RaftClient.newBuilder()
.setProperties(raftProperties)
.setRaftGroup(raftGroup)
.setClientRpc(new GrpcFactory(new Parameters())
.newRaftClientRpc(ClientId.randomId(), raftProperties))
.build();
|
Uma vez criado o cliente, podemos fazer invocações de operações nos servidores. Cada operação será invocada em todos os servidores, na mesma ordem.
Este protótipo suporta duas operações, add
e get
, incluindo algumas variações, que ignoraremos por enquanto.
A operação add
é codificada como uma String
, add:k:v
, onde k
e v
são do tipo String
. add:k:v
adiciona uma entrada em um mapa implementado pelo nosso servidor com chave k
e valor v
.
Já a operação get:k
recupera o valor v
associado à chave k
, se presente no mapa.
O método RaftClient.io().send
é usado para enviar modificações para as réplicas e deve, necessariamente, passar pelo protocolo Raft.
Já o método RaftClient.io().sendReadOnly
é usado para enviar consultas a qualquer das réplicas.
Ambos os métodos codificam o comando sendo enviado (add:k:v
ou get:k
) no formato interno do Ratis para as réplicas e retorna um objeto RaftClientReply
, que pode ser usado para pegar a resposta da operação.
O código é auto explicativo.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 | RaftClientReply getValue;
CompletableFuture<RaftClientReply> compGetValue;
String response;
switch (args[0]){
case "add":
getValue = client.io().send(Message.valueOf("add:" + args[1] + ":" + args[2]));
response = getValue.getMessage().getContent().toString(Charset.defaultCharset());
System.out.println("Resposta:" + response);
break;
case "get":
getValue = client.io().sendReadOnly(Message.valueOf("get:" + args[1]));
response = getValue.getMessage().getContent().toString(Charset.defaultCharset());
System.out.println("Resposta:" + response);
break;
case "add_async":
compGetValue = client.async().send(Message.valueOf("add:" + args[1] + ":" + args[2]));
getValue = compGetValue.get();
response = getValue.getMessage().getContent().toString(Charset.defaultCharset());
System.out.println("Resposta: " + response);
break;
case "get_stale":
getValue = client.io().sendStaleRead(Message.valueOf("get:" + args[1]), 0, RaftPeerId.valueOf(args[2]));
response = getValue.getMessage().getContent().toString(Charset.defaultCharset());
System.out.println("Resposta: " + response);
break;
default:
System.out.println("comando inválido");
}
client.close();
}
}
|
Um vez criado o cliente, crie a classe Servidor
, no arquivo Servidor.java
; a parte inicial do código é semelhante à do cliente.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 | import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class Servidor
{
//Parametros: myId
public static void main(String args[]) throws IOException, InterruptedException
{
String raftGroupId = "raft_group____um"; // 16 caracteres.
//Setup for node all nodes.
Map<String,InetSocketAddress> id2addr = new HashMap<>();
id2addr.put("p1", new InetSocketAddress("127.0.0.1", 3000));
id2addr.put("p2", new InetSocketAddress("127.0.0.1", 3500));
id2addr.put("p3", new InetSocketAddress("127.0.0.1", 4000));
List<RaftPeer> addresses = id2addr.entrySet()
.stream()
.map(e -> RaftPeer.newBuilder().setId(e.getKey()).setAddress(e.getValue()).build())
.collect(Collectors.toList());
|
A primeira diferença vem na necessidade de identificar o servidor dentro do conjunto de servidores, o que é feito com um RaftPeerId
.
Como cada servidor deve usar um identificador único, do conjunto pré-determinado em id2addr
, o identificador é passado como argumento para o programa, obrigatoriamente.
| //Setup for this node.
RaftPeerId myId = RaftPeerId.valueOf(args[0]);
if (addresses.stream().noneMatch(p -> p.getId().equals(myId)))
{
System.out.println("Identificador " + args[0] + " é inválido.");
System.exit(1);
}
|
Encare a seção seguinte como uma receita, mas observe que o método RaftServerConfigKeys.setStorageDir
recebe o nome de uma pasta como argumento, que será usada para armazenar o estado da máquina de estados.
Se você executar o servidor múltiplas vezes, a cada nova execução o estado anterior do sistema será recuperado desta pasta.
Para limpar o estado, apague as pastas de cada servidor.
| RaftProperties properties = new RaftProperties();
properties.setInt(GrpcConfigKeys.OutputStream.RETRY_TIMES_KEY, Integer.MAX_VALUE);
GrpcConfigKeys.Server.setPort(properties, 1000);
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(new File("/tmp/" + myId)));
|
A máquina de estados em si é especificada no próximo excerto, em setStateMachine
, que veremos a seguir.
| //Join the group of processes.
final RaftGroup raftGroup = RaftGroup.valueOf(RaftGroupId.valueOf(ByteString.copyFromUtf8(raftGroupId)), id2addr);
RaftServer raftServer = RaftServer.newBuilder()
.setServerId(myId)
.setStateMachine(new MaquinaDeEstados())
.setProperties(properties)
.setGroup(raftGroup)
.build();
raftServer.start();
|
Uma vez iniciado o servidor, basta esperar que ele termine antes de sair do programa.
| while(raftServer.getLifeCycleState() != LifeCycle.State.CLOSED) {
TimeUnit.SECONDS.sleep(1);
}
}
}
|
Vamos agora para a definição da classe MaquinaDeEstados
, no arquivo MaquinaDeEstados.java
.
Esta classe deve implementar a interface org.apache.ratis.statemachine.StateMachine
e seus vários métodos ou, mais simples, estende org.apache.ratis.statemachine.impl.BaseStateMachine
, a abordagem que usaremos aqui.
| public class MaquinaDeEstados extends BaseStateMachine
{
|
Por enquanto, ignoraremos o armazenamento do estado em disco, mantendo-o simplesmente em memória no campo key2values
, e simplesmente implementaremos o processamento de comandos, começando pela implementação do método query
.
Este método é reponsável por implementar operações que não alteram o estado da máquina de estados, enviadas com o método RaftClient::sendReadOnly
. A única query
no nosso sistema é o get
.
No código, o conteúdo da requisição enviada pelo cliente deve ser recuperado em quebrado em operação (get
) e chave , usando :
como delimitador.
Recuperado o valor associado à chave, o mesmo é colocado em um CompletableFuture
e retornado.
| private final Map<String, String> key2values = new ConcurrentHashMap<>();
@Override
public CompletableFuture<Message> query(Message request) {
final String[] opKey = request.getContent().toString(Charset.defaultCharset()).split(":");
final String result = opKey[0]+ ":"+ key2values.get(opKey[1]);
LOG.debug("{}: {} = {}", opKey[0], opKey[1], result);
return CompletableFuture.completedFuture(Message.valueOf(result));
}
|
O método applyTransaction
implementa operações que alteram o estado, como add
, enviadas com o método RaftClient::send
.
Da mesma forma que em get
, a operação deve ser recuperada em quebrada em operação (add
), chave e valor, usando :
como delimitador.
1
2
3
4
5
6
7
8
9
10
11
12
13
14 | @Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
final RaftProtos.LogEntryProto entry = trx.getLogEntry();
final String[] opKeyValue = entry.getStateMachineLogEntry().getLogData().toString(Charset.defaultCharset()).split(":");
final String result = opKeyValue[0]+ ":"+ key2values.put(opKeyValue[1], opKeyValue[2]);
final CompletableFuture<Message> f = CompletableFuture.completedFuture(Message.valueOf(result));
final RaftProtos.RaftPeerRole role = trx.getServerRole();
LOG.info("{}:{} {} {}={}", role, getId(), opKeyValue[0], opKeyValue[1], opKeyValue[2]);
return f;
}
|
Pronto, você já tem uma máquina de estados replicada, bastando agora apenas compilá-la e executá-la.
Para compilar, de raiz do projeto execute o comando mvn package
.
A primeira vez que faz isso pode demorar um pouco pois várias dependências são baixadas da Internet.
Ao final da execução do comando você deveria ver algo semelhante ao seguinte
| ...
INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.793 s
[INFO] Finished at: 2020-12-06T23:06:32-03:00
[INFO] ------------------------------------------------------------------------
|
Então, em três terminais diferentes, execute os seguintes comandos:
| java -cp target/ChaveValor-1.0-SNAPSHOT-jar-with-dependencies.jar Servidor p1
java -cp target/ChaveValor-1.0-SNAPSHOT-jar-with-dependencies.jar Servidor p2
java -cp target/ChaveValor-1.0-SNAPSHOT-jar-with-dependencies.jar Servidor p3
|
Para executar o cliente, em um outro terminal, faça, por exemplo,
| java -cp target/ChaveValor-1.0-SNAPSHOT-jar-with-dependencies.jar Cliente add k1 testek1
java -cp target/ChaveValor-1.0-SNAPSHOT-jar-with-dependencies.jar Cliente get k1
java -cp target/ChaveValor-1.0-SNAPSHOT-jar-with-dependencies.jar Cliente add k2 testek2
|
Todo o código está disponível no Github
Exercício
Operações assíncronas
TODO
- Operações assíncronas usando
async()
em vez de io()
.
CompletableFuture
Leituras "velhas"
TODO
- stale reads usando
sendStaleRead
em vez de sendRead
.
- índice inicial
- nó
java -cp target/ChaveValor-1.0-SNAPSHOT-jar-with-dependencies.jar Cliente get_stale k1 p1