implementação do kafka e redução de custos da infraestrutura

Implementação do Kafka e redução de custos

Kafka: entenda como reduzimos custos com infraestrutura
Por José Nunes – Analista de Sistemas

Selecionar um serviço e com apenas alguns cliques ter uma solução rodando em cloud é uma das possibilidades oferecidas pela AWS e que facilita a atuação de muitas empresas. Entretanto, quem trabalha com esse tipo de serviço sabe que, em algum momento, ele pode ser mais dispendioso que o esperado. Por isso, pensar em outras alternativas para solucionar problemas é fundamental para manter o negócio sustentável.

A adequação de ferramentas

Entre os serviços que já utilizamos, o Amazon Kinesis Firehose foi amplamente utilizado como parte de nossa arquitetura, na ingestão de eventos enviados pelos usuários. Contudo, com o crescimento da Cinnecta e, consequentemente, do número de clientes, maior também ficou o volume de tráfego de eventos. E, ainda que a solução estivesse preparada para receber milhões de requests diariamente, o custo com ingestão e conversão de formato foram fatores determinantes para analisarmos outras formas de atuação.

É aí que entra o Apache Kafka. Após a participação do time em eventos voltados para a tecnologia e da identificação do seu potencial em palestras e pesquisas, decidimos fazer uma prova de conceito (POC). Isso com o objetivo de reproduzir o que era feito com o Amazon Kinesis Firehose, a partir do Kafka, e receber os eventos no formato JSON e escrevê-los no formato Parquet em um bucket no S3.

Neste artigo vamos explicar todo o passo a passo executado pelo time de desenvolvimento para executar a POC – para isto pressupõe que o leitor já tenha algum conhecimento básico da plataforma de stream

I: Criação de um Broker

Para a criação do broker foi utilizado uma imagem ubuntu 20.04 LTS da AWS. Em seguida, criamos uma instância EC2, com a imagem citada –  o que exige a instalação do java, um pré-requisito para o Kafka. Para instalar o java, basta executar os seguintes comandos:

$ sudo apt-get update 
$ sudo apt upgrade
$ sudo add-apt-repository ppa:linuxuprising/java 
$ sudo apt install openjdk-11-jre-headless

Depois da instalação devidamente efetuada, a versão pode ser conferida com o seguinte comando:

$ java -version

II: Instalação do Kafka 

Nesta POC utilizamos a plataforma da Confluent, que é a empresa com mais contribuições no projeto do Apache Kafka. Para fazer a instalação, basta executar os seguintes comandos:

$ wget -qO - https://packages.confluent.io/deb/5.2/archive.key | sudo apt-key add -
$ sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.2 stable main"
$ sudo apt-get update && sudo apt-get install confluent-platform-2.12

Dessa forma, será utilizada a versão 5.2 da Confluent Platform. Todas versões podem ser conferidas aqui.

Junto da Confluent Platform, outros serviços também são instalados. Vamos explicar cada um deles abaixo

Kafka

Software de processamento distribuído de stream. O nosso agente principal do projeto – core de toda nossa solução.

Zookeeper

Serviço utilizado para armazenamento de configuração de aplicações distribuídas. É nele que o Kafka salva seus metadados, tais como localização de partições e configuração de tópicos. 

Schema Registry

Responsável por armazenar schemas do tipo AVRO, JSON e Protobuf,  versionando-os. Vale ressaltar que por baixo dele é utilizado o Kafka para armazenar tais informações.

Kafka Connect

A fim de facilitar a atuação e dispensar a necessidade de criar aplicativos consumidores para salvar dados, de um tópico em um serviço de armazenamento externo, nasceu o Kafka Connect. Ele atua como um facilitador na busca de dados de um tópico e na transcrição em uma fonte externa, seja no S3 ou em um banco de dados relacional e outros. Peça chave no nosso projeto.

Kafka Rest Proxy

Fornece uma interface RESTful para o nosso cluster Kafka. Objetiva otimizar o trabalho quanto à produção e consumo, além de executar outros comandos administrativos.

KSQL

Alternativa ao Kafka Streams, o KSQL facilita a criação de aplicações streams, em que se deseja buscar mensagens de um tópico e escrever em outro. Fato que evita a implementação de duas aplicações (produtor e consumidor), bem como manutenção em mais códigos. Como o próprio nome sugere, nele podemos utilizar declarações do tipo SQL.

Control Center

Interface web para analisar informações do nosso cluster, modificar configuração de tópicos e até mesmo executar alguns comandos diretamente com o KSQL. É simples e fornece algumas boas possibilidades.

III: Configuração do Broker

Depois de compreender todos os serviços agregados ao Kafka, definimos as seguintes configurações para alguns deles: 

Kafka

No arquivo /etc/kafka/server.properties descomentar as seguintes linhas:

metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
confluent.metrics.reporter.bootstrap.servers=localhost:9092
confluent.metrics.reporter.topic.replicas=1

Control Center

Em /etc/confluent-control-center/control-center-production.properties, descomentar as seguintes linhas:

bootstrap.servers=localhost:9092 zookeeper.connect=localhost:2181

Depois, adicionar as seguintes linhas no final do arquivo:

confluent.controlcenter.internal.topics.partitions=1
confluent.controlcenter.internal.topics.replication=1
confluent.controlcenter.command.topic.replication=1
confluent.monitoring.interceptor.topic.partitions=1
confluent.monitoring.interceptor.topic.replication=1

Para o Control Center, pode ser utilizado o KSQL pela interface web, para isto deve-se descomentar e definir a seguinte propriedade.

confluent.controlcenter.ksql.url=http://localhost:8088

Schema Registry

Não foi necessário mudar nenhuma configuração, mas caso tenha mais de um broker, verificar quais propriedades alterar em /etc/schema-registry/schema-registry.properties.

IV: Iniciar serviços

Para iniciar todos estes serviços após estas modificações, executar:

$ sudo systemctl start confluent-zookeeper 
$ sudo systemctl start confluent-kafka 
$ sudo systemctl start confluent-schema-registry 
$ sudo systemctl start confluent-kafka-connect 
$ sudo systemctl start confluent-kafka-rest 
$ sudo systemctl start confluent-ksql 
$ sudo systemctl start confluent-control-center

Para verificar o status de algum dos serviços:

$ systemctl status confluent*

V: Criação de tópicos

Aqui foram criados dois tópicos para atingirmos nosso objetivo. Um tópico irá receber os nossos eventos em JSON e o outro terá estes mesmos eventos no formato AVRO produzido através de um stream do KSQL. Para criar os tópicos basta executar os comandos:

Tópico para enviar os eventos em JSON:

$ kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic json_events

Tópico em que o s3 connector irá buscar os eventos para jogar no S3:

$ kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic events

VI: Criação de Streams com KSQL

Neste ponto, é necessário criar os streams do KSQL responsáveis por pegar os eventos no tópico json_events, convertê-lo no formato avro e jogar no tópico ‘events’.

Para abrir o console KSQL basta rodar o comando abaixo:

$ sudo ksql http://localhost:8088

Através disso, um console do KSQL será aberto. Deve-se, então, criar o stream de JSON, definir o tópico de origem e seu esquema.

CREATE STREAM events_source_json (id INT, name VARCHAR, city VARCHAR, age INT) WITH (KAFKA_TOPIC='json_events', VALUE_FORMAT='JSON');

O segundo stream irá gerar dados no formato AVRO baseado nos eventos do stream ‘events_source_json’ e inserir no tópico ‘events’.

CREATE STREAM events_target_avro WITH (KAFKA_TOPIC='events',PARTITIONS=1,VALUE_FORMAT='AVRO') AS SELECT * FROM events_source_json;

Se desejar conferir os streams criados, execute o comando:

$ show streams;

E para visualizar os dados após conversão para avro use:

$ print ‘events’;

Com isso, boa parte do processo está concluída. Caso queira testar o fluxo até aqui, gere alguns eventos no console do Kafka producer, para isto:. 

$ kafka-console-producer --broker-list localhost:9092 --topic json_events

Neste console basta colar eventos no formato que foi definido o schema anteriormente. Por exemplo:

{“id”: 1, “name”: “Fulano Beltrano da Silva”, “city”: “Belo Horizonte”, “age”: 27}

Ao realizar esses passos você poderá visualizar, em outro terminal no console do KSQL, os dados que foram convertidos em avro, por meio do comando citado acima. É importante ressaltar que os eventos aparecem conforme forem inseridos e que é neste processo que o nome das colunas são convertidos para uppercase

De acordo com os estudos que realizamos, até o momento, esse é o único formato possível. 

VII: Configuração do Kafka Connect Sink

Essa é parte determinante do processo, caracterizada pelo envio dos dados para um bucket do S3. Deve-se utilizar, para isso, um conector de ingestão da confluent platform – aplicação do tipo consumidor especializada em buscar dados de um tópico e inserir em uma base, seja um banco de dados ou no S3. 

Claro que você pode construir uma aplicação que consome de um tópico do kafka e escreve em parquet no S3. Mas por que reinventar a roda quando já existe uma solução que já faz isto para você e sem precisar codificar uma linha de código?!

O Kafka Connect Rest API já é executado na porta 8083, portanto, basta  definirmos uma configuração para o nosso consumidor e submetê-la. Segue uma possível configuração:

{
"name" :"events-s3-connector",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.region": "us-east-1",
"s3.bucket.name": "meu-bucket",
"topics.dir": "",
"flush.size": "100000",
"rotate.interval.ms": "360000",
"auto.register.schemas": "false",
"tasks.max": "1",
"timezone": "UTC",
"parquet.codec": "snappy",
"topics": "events",
"schema.registry.url": "http://localhost:8081",
"s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "YYYY/MM/dd/HH",
"partition.duration.ms": "3600000",
"locale": "en-US",
"timestamp.extractor": "Record",
"value.converter.schema.registry.url": "http://localhost:8081"
}
}

Em continuidade, salve a configuração em um arquivo com nome s3_sink_events.json, por exemplo. Nesta especificação, está definido que a cada 100000 eventos, em ‘flush.size’, haverá a compressão e envio para o bucket ‘meu-bucket’ que está localizado na região ‘us-east-1’. Outro detalhe é a propriedade ‘path.format’, que possibilitou a definição de um formato similar ao que é utilizado pelo Amazon Kinesis Firehose. Além disso, em ‘topics’ está definido de qual tópico o sink irá buscar os eventos.

Para submeter esta configuração basta executar:

$ curl -X POST -H 'Content-Type:application/json' --data @"./s3_sink_events.json" http://localhost:8083/connectors/

Finalize com o envio de mais alguns eventos e verifique se foram enviados para o bucket especificado. E, caso você não use algum app para gerar milhares de eventos e enviar no tópico, sugerimos a alteração da propriedade ‘flush.size’ para um número menor, por exemplo: 5.

Considerações finais 

Com a POC conseguimos atingir o objetivo de substituir o Kinesis Firehose e entregar igualmente excelência e assertividade nos serviços, bem como a diminuição significativa dos custos. Mensuramos que, após a aplicação do Kafka,  reduzimos cerca de 75% dos gastos com essa parte da infraestrutura. Tudo isso a partir do uso de uma solução relativamente simples, mas que se mostrou apta ao processamento de milhões de eventos com alta performance. 

Na substituição vimos empiricamente o potencial e os benefícios trazidos pela plataforma de stream, o que nos motivou a trabalhar para adotar a metodologia em outras pontas de nossos serviços – na substituição até mesmo Kinesis Analytics e o Kinesis Streams. Além disso, percebemos que a implementação de novas ferramentas proporcionam uma relação de ganha-ganha, tanto em relação aos custos, quanto na otimização do trabalho realizado. E, é claro, preservando o padrão de qualidade na entrega.

Gostou do conteúdo e quer ver mais artigos técnicos no blog Cinnecta, ou testou a aplicação sugerida? Comente aqui e nos conte o que achou.

Para saber mais sobre os métodos utilizados por nós e sobre a atuação do time de Devs envie um e-mail para cinnecta@cinnecta.com 

Compartilhar no facebook
Compartilhar no twitter
Compartilhar no linkedin
Compartilhar no telegram
Compartilhar no whatsapp
Compartilhar no email

Deixe aqui o seu comentário

Deixe uma resposta

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *