Kafka installazione e comandi Shell

Installazione su Redhat e Centos

Kafka installazione e comandi Shell. Nell’articolo precedente abbiamo visto che cosa è Kafka, quali sono le differenze tra Kafka e JMS. Ora in questo articolo vedremo come installare e utilizzare Kafka utilizzando i comandi Shell. Dopo aver capito il funzionamento andremo a sviluppare un microservizio con integrato il motore di messaggistica istantanea di Kafka. Ma prima vediamo come installare manualmente l’ultima versione di Kafka.

In questo link trovate tutti pacchetti per installare Kafka e Zookeeper. Download

Iniziamo con lo scaricare l’ultima versione di Kafka per S.O. Redhat e CentOS.

wget https://www-eu.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz -O /opt/kafka_2.12-2.3.0.tgz

Una volta completato il download facciamo accesso alla directory dove si trova il file .tgz e lo scompattiamo.

cd /opt
tar -xvf kafka_2.12-2.3.0.tgz

Ora creiamo un link simbolico per facilitare l’accesso alla directory

ln -s /opt/kafka_2.11-2.1.0 /opt/kafka

Passiamo alla creazione dell’utente “non-privileged user” che avrà il compito di gestire e utilizzare i servizi zookeeper e kafka.

useradd kafka



Diamo i privilegi all’utente “kafka” per la directory /opt/kafka*

chown -R kafka:kafka /opt/kafka*

Ora creiamo il file ” /etc/systemd/system/zookeeper.service” è andremo ad inserire il seuente codice

[Unit]
Description=zookeeper
After=syslog.target network.target

[Service]
Type=simple

User=kafka
Group=kafka

ExecStart=/opt/kafka/bin/zookeeper-server-start.sh /opt/kafka/config/zookeeper.properties
ExecStop=/opt/kafka/bin/zookeeper-server-stop.sh

[Install]
WantedBy=multi-user.target

Passiamo alla creazione del file ” /etc/systemd/system/kafka.service” per il servizio Kafka

[Unit]
Description=Apache Kafka
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple

User=kafka
Group=kafka

ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

Ora facciamo ripartire i servizi con questa istruzione

# systemctl daemon-reload

Facciamo partire i servizi con queste 2 istruzioni

# systemctl start zookeeper
# systemctl start kafka

Prima di iniziare lo sviluppo verifichiamo che i servizi stanno girando correttamente. Digitiamo queste istruzioni e vediamo cosa risponde il sistema.



Verifichiamo se Zookeeper è partito. Eseguiamo questo comando nella shell aperta.

# systemctl status zookeeper.service

zookeeper.service - zookeeper
   Loaded: loaded (/etc/systemd/system/zookeeper.service; disabled; vendor preset: disabled)
   Active: active (running) since Sat 2019-09-21 14:46:47 PDT; 1min 38s ago
 Main PID: 25446 (java)
    Tasks: 38
   CGroup: /system.slice/zookeeper.service
           └─25446 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -D...

Sep 21 14:46:49 localhost.localdomain zookeeper-server-start.sh[25446]: [2019-09-21 14:46:49,627] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.z...actory)
Sep 21 14:47:29 localhost.localdomain zookeeper-server-start.sh[25446]: [2019-09-21 14:47:29,998] INFO Accepted socket connection from /127.0.0.1:57386 (...actory)
Sep 21 14:47:30 localhost.localdomain zookeeper-server-start.sh[25446]: [2019-09-21 14:47:30,006] INFO Client attempting to establish new session at /127...Server)
Sep 21 14:47:30 localhost.localdomain zookeeper-server-start.sh[25446]: [2019-09-21 14:47:30,009] INFO Creating new log file: log.1 (org.apache.zookeeper...TxnLog)
Sep 21 14:47:30 localhost.localdomain zookeeper-server-start.sh[25446]: [2019-09-21 14:47:30,039] INFO Established session 0x100039b7fcc0000 with negotia...Server)
Sep 21 14:47:30 localhost.localdomain zookeeper-server-start.sh[25446]: [2019-09-21 14:47:30,169] INFO Got user-level KeeperException when processing ses...cessor)
Sep 21 14:47:30 localhost.localdomain zookeeper-server-start.sh[25446]: [2019-09-21 14:47:30,181] INFO Got user-level KeeperException when processing ses...cessor)
Sep 21 14:47:30 localhost.localdomain zookeeper-server-start.sh[25446]: [2019-09-21 14:47:30,187] INFO Got user-level KeeperException when processing ses...cessor)
Sep 21 14:47:30 localhost.localdomain zookeeper-server-start.sh[25446]: [2019-09-21 14:47:30,929] INFO Got user-level KeeperException when processing ses...cessor)
Sep 21 14:47:33 localhost.localdomain zookeeper-server-start.sh[25446]: [2019-09-21 14:47:33,483] INFO Got user-level KeeperException when processing sessionid:...
Hint: Some lines were ellipsized, use -l to show in full.

Verifichiamo lo stato di Kafka

# systemctl status kafka.service

kafka.service - Apache Kafka
   Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: disabled)
   Active: active (running) since Sat 2019-09-21 14:47:27 PDT; 1min 59s ago
 Main PID: 25840 (java)
    Tasks: 80
   CGroup: /system.slice/kafka.service
           └─25840 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava...

Sep 21 14:47:33 localhost.localdomain kafka-server-start.sh[25840]: [2019-09-21 14:47:33,288] INFO [ProducerId Manager 0]: Acquired new producerId block ...anager)
Sep 21 14:47:33 localhost.localdomain kafka-server-start.sh[25840]: [2019-09-21 14:47:33,326] INFO [TransactionCoordinator id=0] Starting up. (kafka.coor...inator)
Sep 21 14:47:33 localhost.localdomain kafka-server-start.sh[25840]: [2019-09-21 14:47:33,332] INFO [TransactionCoordinator id=0] Startup complete. (kafka...inator)
Sep 21 14:47:33 localhost.localdomain kafka-server-start.sh[25840]: [2019-09-21 14:47:33,333] INFO [Transaction Marker Channel Manager 0]: Starting (kafk...anager)
Sep 21 14:47:33 localhost.localdomain kafka-server-start.sh[25840]: [2019-09-21 14:47:33,443] INFO [/config/changes-event-process-thread]: Starting (kafk...Thread)
Sep 21 14:47:33 localhost.localdomain kafka-server-start.sh[25840]: [2019-09-21 14:47:33,465] INFO [SocketServer brokerId=0] Started data-plane processor...Server)
Sep 21 14:47:33 localhost.localdomain kafka-server-start.sh[25840]: [2019-09-21 14:47:33,479] INFO Kafka version: 2.3.0 (org.apache.kafka.common.utils.Ap...Parser)
Sep 21 14:47:33 localhost.localdomain kafka-server-start.sh[25840]: [2019-09-21 14:47:33,480] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.com...Parser)
Sep 21 14:47:33 localhost.localdomain kafka-server-start.sh[25840]: [2019-09-21 14:47:33,480] INFO Kafka startTimeMs: 1569102453468 (org.apache.kafka.com...Parser)
Sep 21 14:47:33 localhost.localdomain kafka-server-start.sh[25840]: [2019-09-21 14:47:33,499] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
Hint: Some lines were ellipsized, use -l to show in full.

Facciamo ripartire i servizi al boot eseguendo questi comandi

# systemctl enable zookeeper.service
Created symlink from /etc/systemd/system/multi-user.target.wants/zookeeper.service to /etc/systemd/system/zookeeper.service.

# systemctl enable kafka.service
Created symlink from /etc/systemd/system/multi-user.target.wants/kafka.service to /etc/systemd/system/kafka.service.

Kafka comandi Shell per Consumer e Producer

Per concludere l’installazione facciamo un test sul servizio eseguendo i seguenti comandi. Prima di tutto creiamo il nostro primo topic eseguendo il comando che segue

# /opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic APFirstTopic

Vediamo insieme Se tutto è andato bene e il topic è stato creato avremo questa risposta “Created topic APFirstTopic.“.

Ora facciamo sottoscrivere il nostro primo “consumer” al nostro topic e lo lasciamo in attesa di ricevere il nostro primo messaggio.

# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic APFirstTopic --from-beginning

Inviamo il nostro primo messaggio digitando l’istruzione che segue in un altro terminale come “Producer”. prima di farlo vediamo come interrogare Kafka per conoscere i topic esistenti.

# /opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

Nella lista vedremo il nostro primo topic “APFirstTopic“. Ora mandiamo il nostro primo messaggio. Eseguiamo questa istruzione avendo cura di indicare il nome del primo topic che abbiamo creato

# /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic APFirstTopic

Il terminale rimane in attesa del tuo primo messaggio. Quindi digitiamo il testo del messaggio che vogliamo inviare a tutti i Consumer iscritti al topic.

# /opt/kafka/bin/kafka-console-producer.sh –broker-list localhost:9092 –topic APFirstTopic

> Questo è il mio primo messaggio

Nel terminale dove è stato lanciato il comando del “consumer” vedremo questo

# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic APFirstTopic --from-beginning
Questo è il primo messaggio

Potrebbero interessarti anche...