#Kafka installation prerequisites
Explore tagged Tumblr posts
Link
#apache spark kafka installation#Spark kafka setup on Ubuntu#Apache Spark kafka Installation on Ubuntu#Apache spark kafka installation guide#kafka installation on Ubuntu#Kafka installation prerequisites
1 note
·
View note
Text
All applications generate information when running, this information is stored as logs. As a system administrator, you need to monitor these logs to ensure the proper functioning of the system and therefore prevent risks and errors. These logs are normally scattered over servers and management becomes harder as the data volume increases. Graylog is a free and open-source log management tool that can be used to capture, centralize and view real-time logs from several devices across a network. It can be used to analyze both structured and unstructured logs. The Graylog setup consists of MongoDB, Elasticsearch, and the Graylog server. The server receives data from the clients installed on several servers and displays it on the web interface. Below is a diagram illustrating the Graylog architecture Graylog offers the following features: Log Collection – Graylog’s modern log-focused architecture can accept nearly any type of structured data, including log messages and network traffic from; syslog (TCP, UDP, AMQP, Kafka), AWS (AWS Logs, FlowLogs, CloudTrail), JSON Path from HTTP API, Beats/Logstash, Plain/Raw Text (TCP, UDP, AMQP, Kafka) e.t.c Log analysis – Graylog really shines when exploring data to understand what is happening in your environment. It uses; enhanced search, search workflow and dashboards. Extracting data – whenever log management system is in operations, there will be summary data that needs to be passed to somewhere else in your Operations Center. Graylog offers several options that include; scheduled reports, correlation engine, REST API and data fowarder. Enhanced security and performance – Graylog often contains sensitive, regulated data so it is critical that the system itself is secure, accessible, and speedy. This is achieved using role-based access control, archiving, fault tolerance e.t.c Extendable – with the phenomenal Open Source Community, extensions are built and made available in the market to improve the funmctionality of Graylog This guide will walk you through how to run the Graylog Server in Docker Containers. This method is preferred since you can run and configure Graylog with all the dependencies, Elasticsearch and MongoDB already bundled. Setup Prerequisites. Before we begin, you need to update the system and install the required packages. ## On Debian/Ubuntu sudo apt update && sudo apt upgrade sudo apt install curl vim git ## On RHEL/CentOS/RockyLinux 8 sudo yum -y update sudo yum -y install curl vim git ## On Fedora sudo dnf update sudo dnf -y install curl vim git 1. Install Docker and Docker-Compose on Linux Of course, you need the docker engine to run the docker containers. To install the docker engine, use the dedicated guide below: How To Install Docker CE on Linux Systems Once installed, check the installed version. $ docker -v Docker version 20.10.13, build a224086 You also need to add your system user to the docker group. This will allow you to run docker commands without using sudo sudo usermod -aG docker $USER newgrp docker With docker installed, proceed and install docker-compose using the guide below: How To Install Docker Compose on Linux Verify the installation. $ docker-compose version Docker Compose version v2.3.3 Now start and enable docker to run automatically on system boot. sudo systemctl start docker && sudo systemctl enable docker 2. Provision the Graylog Container The Graylog container will consist of the Graylog server, Elasticsearch, and MongoDB. To be able to achieve this, we will capture the information and settings in a YAML file. Create the YAML file as below: vim docker-compose.yml In the file, add the below lines: version: '2' services: # MongoDB: https://hub.docker.com/_/mongo/ mongodb: image: mongo:4.2 networks: - graylog #DB in share for persistence volumes: - /mongo_data:/data/db # Elasticsearch: https://www.elastic.co/guide/en/elasticsearch/reference/7.10/docker.html
elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2 #data folder in share for persistence volumes: - /es_data:/usr/share/elasticsearch/data environment: - http.host=0.0.0.0 - transport.host=localhost - network.host=0.0.0.0 - "ES_JAVA_OPTS=-Xms512m -Xmx512m" ulimits: memlock: soft: -1 hard: -1 mem_limit: 1g networks: - graylog # Graylog: https://hub.docker.com/r/graylog/graylog/ graylog: image: graylog/graylog:4.2 #journal and config directories in local NFS share for persistence volumes: - /graylog_journal:/usr/share/graylog/data/journal environment: # CHANGE ME (must be at least 16 characters)! - GRAYLOG_PASSWORD_SECRET=somepasswordpepper # Password: admin - GRAYLOG_ROOT_PASSWORD_SHA2=e1b24204830484d635d744e849441b793a6f7e1032ea1eef40747d95d30da592 - GRAYLOG_HTTP_EXTERNAL_URI=http://192.168.205.4:9000/ entrypoint: /usr/bin/tini -- wait-for-it elasticsearch:9200 -- /docker-entrypoint.sh networks: - graylog links: - mongodb:mongo - elasticsearch restart: always depends_on: - mongodb - elasticsearch ports: # Graylog web interface and REST API - 9000:9000 # Syslog TCP - 1514:1514 # Syslog UDP - 1514:1514/udp # GELF TCP - 12201:12201 # GELF UDP - 12201:12201/udp # Volumes for persisting data, see https://docs.docker.com/engine/admin/volumes/volumes/ volumes: mongo_data: driver: local es_data: driver: local graylog_journal: driver: local networks: graylog: driver: bridge In the file, replace: GRAYLOG_PASSWORD_SECRET with your own password which must be at least 16 characters GRAYLOG_ROOT_PASSWORD_SHA2 with a SHA2 password obtained using the command: echo -n "Enter Password: " && head -1 1514/tcp, :::1514->1514/tcp, 0.0.0.0:9000->9000/tcp, 0.0.0.0:1514->1514/udp, :::9000->9000/tcp, :::1514->1514/udp, 0.0.0.0:12201->12201/tcp, 0.0.0.0:12201->12201/udp, :::12201->12201/tcp, :::12201->12201/udp thor-graylog-1 1a21d2de4439 docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2 "/tini -- /usr/local…" 31 seconds ago Up 28 seconds 9200/tcp, 9300/tcp thor-elasticsearch-1 1b187f47d77e mongo:4.2 "docker-entrypoint.s…" 31 seconds ago Up 28 seconds 27017/tcp thor-mongodb-1 If you have a firewall enabled, allow the Graylog service port through it. ##For Firewalld sudo firewall-cmd --zone=public --add-port=9000/tcp --permanent sudo firewall-cmd --reload ##For UFW sudo ufw allow 9000/tcp 5. Access the Graylog Web UI Now open the Graylog web interface using the URL http://IP_address:9000. Log in using the username admin and SHA2 password(StrongPassw0rd) set in the YAML. On the dashboard, let’s create the first input to get logs by navigating to the systems tab and selecting input. Now search for Raw/Plaintext TCP and click launch new input Once launched, a pop-up window will appear as below. You only need to change the name for the input, port(1514), and select the node, or “Global” for the location for the input. Leave the other details as they are. Save the file and try sending a plain text message to the Graylog Raw/Plaintext TCP input on port 1514. echo 'First log message' | nc localhost 1514 ##OR from another server##
echo 'First log message' | nc 192.168.205.4 1514 On the running Raw/Plaintext Input, show received messages The received message should be displayed as below. You can as well export this to a dashboard as below. Create the dashboard by providing the required information. You will have the dashboard appear under the dashboards tab. Conclusion That is it! We have triumphantly walked through how to run the Graylog Server in Docker Containers. Now you can monitor and access logs on several servers with ease. I hope this was significant to you.
0 notes
Text
Installing and running Kafka on AWS instance (CentOS)
Installing and running Kafka on AWS instance (CentOS)
In this blog we will install and start a single-node, latest and recommended version of kafka ie 0.10.2.0 with the binary for Scala 2.12 on the EC2 Linux instance with centOS as its operating system. We would be using the t2.micro (free tier) instance which comes with 1 GB RAM and 8 GB SSD.
Prerequisite -> 1). Create an EC2 instance ->
Steps for creating an AWS instance are clearly mentioned in…
View On WordPress
0 notes
Text
Change data capture from Neo4j to Amazon Neptune using Amazon Managed Streaming for Apache Kafka
After you perform a point-in-time data migration from Neo4j to Amazon Neptune, you may want to capture and replicate ongoing updates in real time. For more information about automating point-in-time graph data migration from Neo4j to Neptune, see Migrating a Neo4j graph database to Amazon Neptune with a fully automated utility. This post walks you through the steps to automate the capture and replication from Neo4j to Neptune, using an example solution on the cdc-neo4j-msk-neptune GitHub repo. Continuous replication of databases using the change data capture (CDC) pattern allows you to stream your data and make it available to other systems. This post focuses on modernizing your graph database by streaming data from Neo4j using CDC so that you have the latest changes copied into Neptune. By using the Event Interception strategy of the Strangler pattern to modernize Neo4j, you can incrementally push all your changes to Neptune and modify your applications to use Neptune. Neptune is a fast, reliable, fully managed graph database service that makes it easier to build and run applications that work with highly connected datasets. The core of Neptune is a purpose-built, high-performance graph database engine optimized for storing billions of relationships and querying the graph with millisecond latency. Architecture overview The solution in this post automates the deployment of the following architecture in your AWS account. This architecture shows the AWS resources the solution provisions to build a loosely coupled system for the replication. The architecture contains the following elements: An AWS Cloud Development Kit (AWS CDK) app that an end-user triggers, which bootstraps all the required AWS resources inside an Amazon VPC An Amazon Elastic Compute Cloud (Amazon EC2) instance to run purpose-built services running in Docker containers for the replication A single-node Neptune DB cluster with one graph database instance that serves as the target of this replication An Amazon Managed Streaming for Apache Kafka (Amazon MSK) cluster with two nodes that serves as the publish-subscribe broker for this replication Launching the example solution With this solution, you should expect to see changes made to the nodes and relationships in the Neo4j graph database reflected in Neptune in real time. To get started, clone the AWS CDK app from the GitHub repo. After making sure you meet the prerequisites, follow the instructions on GitHub to run the solution. Deep dive into the solution CDC is an architecture pattern that identifies changes in data in a source system and acts on those changes. In this solution, you determine data changes in a Neo4j graph and act on them by transforming them and updating a target Neptune graph in a simple three-step process: Provision AWS resources Process and replicate the changed data Test the end-to-end solution Provisioning AWS resources For a fully automated experience, it’s important to provision the required resources and configure their plumbing, such as applying the right AWS Identity and Access Management (IAM) roles and policies. This enables you to run and test it in your AWS account. This automation provides isolation by creating a separate VPC and launching resources in it. This makes it easy for you to set up and tear down without worrying about any dependencies on your existing environment. After following the steps to run the solution, you see an output similar to the following code: As a result, you create the following resources in your AWS account: AWS Resource Usage Amazon VPC The VPC creates an isolated network that makes sure the solution is created and destroyed without affecting the rest of your AWS development account. Inside the VPC, the app creates one public and one private subnet in two Availability Zones. Amazon EC2 A single EC2 instance is used to run the purpose-built services in Docker containers. Security Groups and IAM policies The EC2 instance needs to talk to Neptune and Amazon MSK for the replication to work. The setup app creates security groups, IAM roles, and policies to ensure that services can securely connect and talk to each other. Amazon MSK Neo4j Streams for Kafka emits changes from a source database to Kafka in real time. Amazon MSK is the fully managed Kafka service that you use in this solution to integrate with Neo4j and Neptune. Neptune You use this fully managed AWS graph database service as the modernization target. Processing and replicating the changed data The EC2 instance you provision runs the following services: startup-service – This Docker container determines Neptune and Amazon MSK endpoints. neo4j-service – This Docker container runs Neo4j version 4.0.0 and has apoc version 4.0.0.6 and neo4j-streams version 4.0.0 plugins installed. This service is configured to publish all changes to the following default values. Follow the instructions in the GitHub repo to find out how to change these default values. Nodes Relationships Amazon MSK Topic Name Person{*} ACTED_IN{*} movie-topic Movie{*) kafka-topic-service – This Docker container creates a new Amazon MSK topic. The neo4j-service publishes changed data to this topic, and the transformation-service subscribes to this topic to get the changed data. You can also configure Amazon MSK to create new topics using auto.create.topics.enable automatically by creating a custom configuration. transformation-service – The Neptune property graph is very similar to Neo4j’s, including support for multiple labels on vertices, and multi-valued properties (sets but not lists). Neo4j allows homogeneous lists of simple types that contain duplicate values to be stored as properties on both nodes and edges. Neptune, on the other hand, provides for set and single cardinality for vertex properties, and single cardinality for edge properties. The transformation-service is designed to accept changed data from Neo4j before transforming it into Neptune’s graph data model. Data flow architecture The following diagram illustrates the data flow architecture and how these services work with each other. The data flow contains the following steps: The user-data shell script of the instance uses docker-compose to launch the four Docker containers. Using user data scripts is a common pattern to run startup scripts when an instance is launched. For this solution, you use it to launch and configure the services. The first service to start is startup-service. You need this service to query AWS CloudFormation describe-stack for the MSK cluster endpoint address. You need this as a separate step because the cluster endpoint isn’t available until the cluster is created. After getting the endpoint address, the service queries it to retrieve Kafka Bootstrap and Zookeeper addresses and port. You use these addresses to configure the Neo4j Streams plugin so that it can send changes to Amazon MSK. startup-service queries the CloudFormation stack for the Neptune endpoint. Although the Amazon CDK stack outputs the Neptune cluster endpoint, it’s a runtime output and isn’t available while the stack is running. kafka-topic-service creates a new topic in Amazon MSK. When the Neo4j graph database running in neo4j-service receives a Cypher script to run, it publishes changed data to the Amazon MSK topic. An interactive user or any other service writing to the Neo4j graph can perform the operation. transformation-service subscribed to the Amazon MSK topic receives the data and processes it by transforming it from Neo4j’s data model to Neptune data. transformation-service pushes transformed data to Neptune. Testing the end-to-end solution The following diagram illustrates the steps to perform an end-to-end testing of the solution. You complete the following steps: SSH into your EC2 instance. Run the following shell script to enter the neo4j-service Docker container: docker container exec -it neo4j-service cypher-shell At the neo4j prompt, run the following Cypher scripts: CREATE (TheMatrix:Movie {title:'The Matrix', released:1999, tagline:'Welcome to the Real World'}); CREATE (Keanu:Person {name:'Keanu Reeves', born:1964}); CREATE (Keanu)-[:ACTED_IN {roles:['Neo']}]->(TheMatrix); This service saves all the debug information in a local file. As an optional step, to see the logs, run the following shell script: docker container logs transformation-service Run the following shell script to launch an Apache TinkerPop Gremlin console configured to send all queries to Neptune (this step verifies that the Neptune graph is in sync with changes in the source): docker run -it -e NEPTUNE_HOST --entrypoint /replace-host.sh sanjeets/neptune-gremlinc-345 At the Gremlin prompt, run the following shell scripts in order: :remote console g.V().count() Extending the solution This solution has a loosely coupled architecture. If you want to replace the transformation-service with your own, you can easily do so by providing a new implementation in a Docker container. You have to change the Docker compose file 02-docker-compose.yml to replace the transformation-service. Similarly, you can replace other services in the solution. For example, you could replace the Neo4j Docker container. Instead of using the Gremlin console in a Docker container, if you prefer, you can quickly and easily query your Neptune databases with Jupyter notebooks, which are fully managed, interactive development environments with live code and narrative text. Notebooks are hosted and billed through Amazon SageMaker. Scaling the solution The modular architecture of this solution allows you to scale the transformation-service independently to meet a high throughput change data capture requirement. Also, by monitoring Amazon Neptune, you should be able to scale it up or down as needed. The following patterns will help you run this solution at scale in real-world scenarios. Scaling the transformation-service with Amazon MSK For simplicity, this solution uses a single Kafka consumer and a single partition. If you want this solution to scale, you may want to create multiple partitions and multiple consumers in a consumer group, as shown in the following architecture. This takes care of a large volume of CDC from the source database by allowing you to launch multiple instances of the transformation-service container. Your new architecture looks similar to the following diagram. How Neptune scales with load Neptune DB clusters and instances scale at three different levels: storage, instance, and read. Depending upon the optimization, after closely monitoring your Neptune cluster, you can independently fine-tune the aforementioned scaling levels. Monitoring Neptune The following screenshot shows various metrics available by default as a dashboard view on the Neptune console. To monitor the CDC performance (for example, to inspect the raw request and the payload containing the Gremlin or SPARQL query), you might want to make the following changes: Enable Neptune audit logs Configure a Neptune DB cluster to publish audit log data to a log group in Amazon CloudWatch Logs Cost of running the solution The following tables outline an hourly estimate of running this solution with on-demand pricing in us-west-2. Changing the instance type defaults in the cdk.json file changes your cost. The storage, I/O, and data transfer rates are assumptions made to simplify calculation. All the prices are as of this writing and might change over time. For more information and to perform a thorough calculation, see the pricing page of each service. Service Instance Type (A) EBS Storage (B) Data Transfer (C) Price per Hour Estimated Hourly Cost (A+B+C) Amazon EC2 t3a.xlarge 100 GB Free within same AZ See Amazon EC2 pricing $0.1504 + $0.01 + 0 = $0.1604 Service Instance type (A) Data stored (B) Data transfer (C) I/O (D) Price per hour Estimated hourly cost (A+B+C+D) Neptune db.r5.large 100 GB Free within same AZ https://aws.amazon.com/blogs/database/change-data-capture-from-neo4j-to-amazon-neptune-using-amazon-managed-streaming-for-apache-kafka/
0 notes
Text
Apache Kafka Cluster Installation Guide
Apache Kafka Cluster Installation Guide
This document will provide you instruction Apache Kafka Cluster Installation Guide Apache Kafka cluster and how to set up Zookeeper.
Download the latest version of Apache Kafka and Extract Apache Kafka from their website here.
At the time of writing the current stable version is 1.0.8. Which I am using on most of the installations.
Prerequisite
Download JRE from the site. http://w…
View On WordPress
0 notes
Text
A Minimalist Guide to Apache Flume
Apache Flume is used to collect, aggregate and distribute large amounts of log data. It can operate in a distributed manor and has various fail-over and recovery mechanisms. I've found it most useful for collecting log lines from Kafka topics and grouping them together into files on HDFS.
The project started in 2011 with some of the earliest commits coming from Jonathan Hsieh, Hari Shreedharan and Mike Percy, all of whom either currently, or at one point, worked for Cloudera. As of this writing the code base is made up of 95K lines of Java.
The building blocks of any Flume agent's configuration is one or more sources of data, one or more channels to transmit that data and one or more sinks to send the data to. Flume is event-driven, it's not something you'd trigger on a scheduled basis. It runs continuously and reacts to new data being presented to it. This contrasts tools like Airflow which run scheduled batch operations.
In this post I'll walk through feeding Nginx web traffic logs into Kafka, enriching them using Python and feeding Flume those enriched records for storage on HDFS.
Installing Prerequisites
The following was run on a fresh Ubuntu 16.04.2 LTS installation. The machine I'm using has an Intel Core i5 4670K clocked at 3.4 GHz, 8 GB of RAM and 1 TB of mechanical storage capacity.
First I've setup a standalone Hadoop environment following the instructions from my Hadoop 3 installation guide. Below I've installed Kafkacat for feeding and reading off of Kafka, libsnappy as I'll be using Snappy compression on the Kafka topics, Python, Screen for running applications in the background and Zookeeper which is used by Kafka for coordination.
$ sudo apt update $ sudo apt install \ kafkacat \ libsnappy-dev \ python-pip \ python-virtualenv \ screen \ zookeeperd
I've created a virtual environment for the Python-based dependencies I'll be using. In it I've installed a web traffic log parser, MaxMind's IPv4 location lookup bindings, Pandas, Snappy bindings for Python and a browser agent parser.
$ virtualenv ~/.enrich $ source ~/.enrich/bin/activate $ pip install \ apache-log-parser \ geoip2 \ kafka-python \ pandas \ python-snappy \ user-agents
MaxMind's database is updated regularly. Below I've downloaded the latest version and stored it in my home folder.
$ wget -c http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz $ tar zxf GeoLite2-City.tar.gz $ mv GeoLite2-City_*/GeoLite2-City.mmdb ~/
Flume & Kafka Up & Running
Below I've installed Flume and Kafka from their respective binary distributions.
$ DIST=http://www-eu.apache.org/dist $ wget -c -O flume.tar.gz $DIST/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz $ wget -c -O kafka.tgz $DIST/kafka/1.1.1/kafka_2.11-1.1.1.tgz
I've stripped the documentation from Flume as it creates ~1,500 files. My view is that documentation should live anywhere but production.
$ sudo mkdir -p /opt/{flume,kafka} $ sudo tar xzvf kafka.tgz \ --directory=/opt/kafka \ --strip 1 $ sudo tar xvf flume.tar.gz \ --directory=/opt/flume \ --exclude=apache-flume-1.9.0-bin/docs \ --strip 1
I'll create and take ownership of the Kafka logs folder so that I can run the service without needing elevated permissions. Make sure to replace mark with the name of your UNIX account.
$ sudo mkdir -p /opt/kafka/logs $ sudo chown -R mark /opt/kafka/logs
I'll launch the Zookeeper service and for the sake of simplicity, I'll run Kafka in a screen. I recommend Supervisor for keeping Kafka up and running in production.
$ sudo /etc/init.d/zookeeper start
$ screen $ /opt/kafka/bin/kafka-server-start.sh \ /opt/kafka/config/server.properties
Hit CTRL-a and then CTRL-d to detach from the screen session and return to the originating shell.
I'll create two Kafka topics. The first, nginx_log, will be fed the traffic logs as they were generated by Nginx. I'll then have a Python script that will parse, enrich and store the logs in CSV format in a separate topic called nginx_enriched. Since this is a standalone setup with a single disk I'll use a replication factor of 1.
$ for TOPIC in nginx_log nginx_enriched; do /opt/kafka/bin/kafka-topics.sh \ --zookeeper 127.0.0.1:2181 \ --create \ --partitions 1 \ --replication-factor 1 \ --topic $TOPIC done
Below is the configuration for the Flume agent. It will read messages off the nginx_enriched Kafka topic and transport them using a memory channel to HDFS. The data will initially live in a temporary folder on HDFS until the record limit has been reached, at which point it'll store the resulting files under a /kafka topic name/year/month/day naming convention for the folder hierarchy. The records are stored in CSV format. Later on Hive will have a table pointed at this folder giving SQL access to the data as it comes in.
$ vi ~/kafka_to_hdfs.conf
feed1.sources = kafka-source-1 feed1.channels = hdfs-channel-1 feed1.sinks = hdfs-sink-1 feed1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource feed1.sources.kafka-source-1.channels = hdfs-channel-1 feed1.sources.kafka-source-1.topic = nginx_enriched feed1.sources.kafka-source-1.batchSize = 1000 feed1.sources.kafka-source-1.zookeeperConnect = 127.0.0.1:2181 feed1.channels.hdfs-channel-1.type = memory feed1.channels.hdfs-channel-1.capacity = 1000 feed1.channels.hdfs-channel-1.transactionCapacity = 1000 feed1.sinks.hdfs-sink-1.channel = hdfs-channel-1 feed1.sinks.hdfs-sink-1.hdfs.filePrefix = hits feed1.sinks.hdfs-sink-1.hdfs.fileType = DataStream feed1.sinks.hdfs-sink-1.hdfs.inUsePrefix = tmp/ feed1.sinks.hdfs-sink-1.hdfs.path = /%{topic}/year=%Y/month=%m/day=%d feed1.sinks.hdfs-sink-1.hdfs.rollCount = 100 feed1.sinks.hdfs-sink-1.hdfs.rollSize = 0 feed1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true feed1.sinks.hdfs-sink-1.hdfs.writeFormat = Text feed1.sinks.hdfs-sink-1.type = hdfs
If you run into out of memory issues you can change the channel's type of "memory" to either "spillablememory" or "file". The Flume documentation covers how to tune these types of channels.
I'll launch the Flume agent in a screen. This is another candidate for running under Supervisor in production.
$ screen $ /opt/flume/bin/flume-ng agent \ -n feed1 \ -c conf \ -f ~/kafka_to_hdfs.conf \ -Dflume.root.logger=INFO,console
Hit CTRL-a and then CTRL-d to detach from the screen session and return to the originating shell.
Feeding Data into Kafka
I've created a sample Nginx web traffic log file. Here are what the first three lines of content look like.
1.2.3.4 - - [17/Feb/2019:08:41:54 +0000] "GET / HTTP/1.1" 200 7032 "-" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36" "-" 1.2.3.4 - - [17/Feb/2019:08:41:54 +0000] "GET /theme/images/mark.jpg HTTP/1.1" 200 9485 "https://tech.marksblogg.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36" "-" 1.2.3.4 - - [17/Feb/2019:08:41:55 +0000] "GET /architecting-modern-data-platforms-book-review.html HTTP/1.1" 200 10822 "https://tech.marksblogg.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.109 Safari/537.36" "-"
I'll feed these logs into the nginx_log Kafka topic. Each line will exist as an individual message in that topic.
$ cat access.log \ | kafkacat -P \ -b localhost:9092 \ -t nginx_log \ -z snappy
I can then check that the logs are stored as expected in Kafka.
$ kafkacat -C \ -b localhost:9092 \ -t nginx_log \ -o beginning \ | less -S
Enriching Nginx Logs
I'm going to use a Python script to read each of the log lines from Kafka, parse, enrich and store them back onto a new Kafka topic. The enrichment steps include attempting to look up the city of each visitor's IP address and parsing the user agent string into a simple browser name and version.
I've used a group identifier for consuming Kafka topics so that I can run multiple instances of this script and they can share the workload. This is handy for scaling out enrichment tasks that are bound by the compute resources of a single process.
I'll flush the newly created messages to Kafka every 500 messages. Note that this scripts expects there is always more data to push things along. If you have a finite ending to your dataset there would need to be logic in place to push the un-flushed records into Kafka.
from StringIO import StringIO import apache_log_parser import geoip2.database as geoip from kafka import (KafkaConsumer, KafkaProducer) import pandas as pd from urlparse import urlparse from user_agents import parse as ua_parse geo_lookup = geoip.Reader('GeoLite2-City.mmdb') log_format = r'%h %l %u %t "%r" %>s %b "%{Referer}i" "%{User-Agent}i"' line_parser = apache_log_parser.make_parser(log_format) group_id = 'nginx_log_enrichers' consumer = KafkaConsumer(bootstrap_servers=['localhost:9092'], group_id=group_id, auto_offset_reset='smallest') producer = KafkaProducer(bootstrap_servers=['localhost:9092'], retries=5, acks='all') consumer.subscribe(['nginx_log']) for msg_count, msg in enumerate(consumer): out = {} try: req = line_parser(msg.value) except apache_log_parser.LineDoesntMatchException as exc: print exc continue url_ = urlparse(req['request_url']) out['url_scheme'] = url_.scheme out['url_netloc'] = url_.netloc out['url_path'] = url_.path out['url_params'] = url_.params out['url_query'] = url_.query out['url_fragment'] = url_.fragment for key in ('remote_host', 'request_method', 'request_http_ver', 'status', 'response_bytes_clf',): out[key] = None if req.get(key, None): if type(req.get(key, None)) is bool: out[key] = req.get(key) elif len(req.get(key).strip()): out[key] = req.get(key).strip() agent_ = ua_parse(req['request_header_user_agent']) for x in range(0, 3): try: out['browser_%d' % x] = \ agent_.browser[x][0] if x == 1 else agent_.browser[x] except IndexError: out['browser_%d' % x] = None location_ = geo_lookup.city(req['remote_host']) out['loc_city_name'] = location_.city.name out['loc_country_iso_code'] = location_.country.iso_code out['loc_continent_code'] = location_.continent.code output = StringIO() pd.DataFrame([out]).to_csv(output, index=False, header=False, encoding='utf-8') producer.send('nginx_enriched', output.getvalue().strip()) if msg_count and not msg_count % 500: producer.flush()
The enriched log lines look like the following prior to being serialised into CSV format.
{'browser_0': 'Chrome', 'browser_1': 72, 'browser_2': '72.0.3626', 'loc_city_name': u'Tallinn', 'loc_continent_code': u'EU', 'loc_country_iso_code': u'EE', 'remote_host': '1.2.3.4', 'request_http_ver': '1.1', 'request_method': 'GET', 'response_bytes_clf': '7032', 'status': '200', 'url_fragment': '', 'url_netloc': '', 'url_params': '', 'url_path': '/', 'url_query': '', 'url_scheme': ''}
While the above script is running I can see the following being reported by the Flume agent.
.. kafka.SourceRebalanceListener: topic nginx_enriched - partition 0 assigned. .. hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false .. hdfs.BucketWriter: Creating /nginx_enriched/year=2019/month=02/day=20/tmp/hits.1550663242571.tmp .. hdfs.HDFSEventSink: Writer callback called. .. hdfs.BucketWriter: Closing /nginx_enriched/year=2019/month=02/day=20/tmp/hits.1550663242571.tmp .. hdfs.BucketWriter: Renaming /nginx_enriched/year=2019/month=02/day=20/tmp/hits.1550663242571.tmp to /nginx_enriched/year=2019/month=02/day=20/hits.1550663242571
Setting Up Hive Tables
With the data landing in HDFS I'll create a table in Hive that will point to the CSV-formatted data. I'll also create a separate table that will hold a copy of that data in compressed, columnar form using ORC formatted-files. Presto will be used to convert the CSV-formatted data into ORC later on. Columnar form can be two orders of magnitude quicker to query and an order of magnitude smaller than row-oriented data.
CREATE EXTERNAL TABLE hits ( browser_0 STRING, browser_1 INTEGER, browser_2 STRING, loc_city_name STRING, loc_continent_code VARCHAR(4), loc_country_iso_code VARCHAR(3), remote_host VARCHAR(15), request_http_ver FLOAT, request_method VARCHAR(10), response_bytes_clf BIGINT, security_researcher STRING, status SMALLINT, url_fragment STRING, url_netloc STRING, url_params STRING, url_path STRING, url_query STRING, url_scheme STRING ) PARTITIONED BY (year SMALLINT, month VARCHAR(2), day VARCHAR(2)) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/nginx_enriched/';
CREATE TABLE hits_orc ( browser_0 STRING, browser_1 INTEGER, browser_2 STRING, loc_city_name STRING, loc_continent_code VARCHAR(4), loc_country_iso_code VARCHAR(3), remote_host VARCHAR(15), request_http_ver FLOAT, request_method VARCHAR(10), response_bytes_clf BIGINT, security_researcher STRING, status SMALLINT, url_fragment STRING, url_netloc STRING, url_params STRING, url_path STRING, url_query STRING, url_scheme STRING ) PARTITIONED BY (year SMALLINT, month VARCHAR(2), day VARCHAR(2)) STORED AS orc;
The data is partitioned by year, month and day on HDFS; both month and day can have leading zeros so I'll use the VARCHAR type to store them. I'll run the following to add any new partitions to the Hive metastore.
MSCK REPAIR TABLE hits; MSCK REPAIR TABLE hits_orc;
I can now check that Hive can see the existing partition.
year=2019/month=02/day=20
Converting CSVs to ORC Format
Finally, I'll convert the CSV-formatted table contents into a separate, ORC-formatted table using Presto. I've found Presto to be the fastest query engine for converting CSV data into ORC format.
$ presto \ --server localhost:8080 \ --catalog hive \ --schema default
INSERT INTO hits_orc SELECT * FROM hits;
With the data loaded into ORC format I can run aggregate queries on the dataset.
SELECT loc_city_name, COUNT(*) FROM hits_orc GROUP BY 1;
loc_city_name | _col1 ---------------+------- Tallinn | 119
Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via
LinkedIn
.
DataTau published first on DataTau
0 notes
Text
Kafka - Install Apache Kafka on Mac
Kafka – Install Apache Kafka on Mac
To install Apache Kafka on Mac, following are the prerequisites required :
Java
And finally, we shall setup Apache Kafka and run it on the Mac machine.
Install Java
Open a browser and hit the url http://www.oracle.com/technetwork/java/javase/downloads/index.html.
Click on JDK, check “Accept License Agreement” and download .dmg file for installation on Mac.
Double click on the downloaded file and…
View On WordPress
0 notes
Photo
Illustration Photo: Communication nodes are installed alongside overhead and underground transformers. The nodes create a virtual energy network – collecting data from the smart meters and other digital equipment on the power lines, and sending it over a wireless network back to Duke Energy and then back to the customer. (credits: Duke Energy / Flickr Attribution-NonCommercial-NoDerivs 2.0 Generic (CC BY-NC-ND 2.0))
Security Concerns in IoT based Smart Manufacturing for Industry 4.0
Authors: Md. Faisal, Dr. Vinodini Katiyar
Source: INTERNATIONAL JOURNAL OF ENGINEERING SCIENCES & RESEARCH TECHNOLOGY 6(1) 218-221
Content Provider: Zenodo
To realize the need and significance of security perspectives in manufacturing automation lets discusses Internet of Things (IoT) in smart manufacturing where the initial step is to recognize the sensors prerequisite into the machine parts from where real time analytics will get the data, for example, continuous temperature change from thermostat sensors or to quantify the speed of any moving item in a machine, connected sensors will be utilized , which will create the data for speed measurements, similarly assortment of sensors are accessible which can be utilized according to the necessities , second steps is to catch these data for the investigation use for that we require IoT gateway, then the third step is get the continuous data through a few popular software tools like Flume or Kafka then the fourth step is do the real time investigation to give real time visualization remotely which will be usually done on cloud thus, getting the software as a service on cloud is a great challenge for IT security suppliers , we really require a high security angles to spare our private data from the programmers separated from that in this paper necessities of security , security configuration in smart manufacturing and end to end protection of data has been talked about.
Check more https://adalidda.com/posts/AzKgcqhoLXXcDyoBS/security-concerns-in-iot-based-smart-manufacturing-for
0 notes
Text
Fluent bit is an open source, light-weight log processing and forwarding service. Fluent bit allows to collect logs, events or metrics from different sources and process them. These data can then be delivered to different backends such as Elastic search, Splunk, Kafka, Data dog, InfluxDB or New Relic. Fluent bit is easy to setup and configure. It gives you full control of what data to collect, parsing the data to provide a structure to the data collected. It allows one to remove unwanted data, filter data and push to an output destination. Therefore, it provides an end to end solution for data collection. Some wonderful features of fluent bit are: High Performance It is super Lightweight and fast, requires less resource and memory It supports multiple data formats. The configuration file for Fluent Bit is very easy to understand and modify. Fluent Bit has built-in TLS/SSL support. Communication with the output destination is secured. Asynchronous I/O Fluent Bit is compatible with docker and kubernetes and can therefore be used to aggregate application logs. There are several ways to log in kubernetes. One way is the default stdout logs that are written to a host path”/var/log/containers” on the nodes in a cluster. This method requires a fluent bit DaemonSet to be deployed. A daemon sets deploys a fluent bit container on each node in the cluster. The second way of logging is the use of a persistent volume. This allows logs to be written and persistent in an internal or external storage such as Cephfs. Fluent bit can be setup as a deployment to read logs from a persistent Volume. In this Blog, we will look at how to send logs from a Kubernetes Persistent Volume to Elastic search using fluent bit. Once logs are sent to elastic search, we can use kibana to visualize and create dashboards using application logs and metrics. PREREQUISITES: First, we need to have a running Kubernetes Cluster. You can use our guides below to setup one if you do not have one yet: Install Kubernetes Cluster on Ubuntu with kubeadm Install Kubernetes Cluster on CentOS 7 with kubeadm Install Production Kubernetes Cluster with Rancher RKE Secondly, we will need an elastic search cluster setup. You can use elasticsearch installation guide if you don’t have one in place yet. In this tutorial, we will setup a sample elastic search environment using stateful sets deployed in the kubernetes environment. We will also need a kibana instance to help us visualize this logs. Deploy Elasticsearch Create the manifest file. This deployment assumes that we have a storage class cephfs in our cluster. A persistent volume will be created along side the elastic search stateful set. Modify this configuration as per your needs. $ vim elasticsearch-ss.yaml apiVersion: apps/v1 kind: StatefulSet metadata: name: es-cluster spec: serviceName: elasticsearch replicas: 1 selector: matchLabels: app: elasticsearch template: metadata: labels: app: elasticsearch spec: containers: - name: elasticsearch image: docker.elastic.co/elasticsearch/elasticsearch:7.2.0 resources: limits: cpu: 1000m requests: cpu: 100m ports: - containerPort: 9200 name: rest protocol: TCP - containerPort: 9300 name: inter-node protocol: TCP volumeMounts: - name: data mountPath: /usr/share/elasticsearch/data env: - name: cluster.name value: k8s-logs - name: node.name valueFrom: fieldRef: fieldPath: metadata.name - name: discovery.seed_hosts value: "es-cluster-0.elasticsearch" - name: cluster.initial_master_nodes value: "es-cluster-0" - name: ES_JAVA_OPTS value: "-Xms512m -Xmx512m"
initContainers: - name: fix-permissions image: busybox command: ["sh", "-c", "chown -R 1000:1000 /usr/share/elasticsearch/data"] securityContext: privileged: true volumeMounts: - name: data mountPath: /usr/share/elasticsearch/data - name: increase-vm-max-map image: busybox command: ["sysctl", "-w", "vm.max_map_count=262144"] securityContext: privileged: true - name: increase-fd-ulimit image: busybox command: ["sh", "-c", "ulimit -n 65536"] securityContext: privileged: true volumeClaimTemplates: - metadata: name: data labels: app: elasticsearch spec: accessModes: [ "ReadWriteOnce" ] storageClassName: cephfs resources: requests: storage: 5Gi Apply this configuration $ kubectl apply -f elasticsearch-ss.yaml 2. Create an elastic search service $ vim elasticsearch-svc.yaml kind: Service apiVersion: v1 metadata: name: elasticsearch labels: app: elasticsearch spec: selector: app: elasticsearch clusterIP: None ports: - port: 9200 name: rest - port: 9300 name: inter-node $ kubectl apply -f elasticsearch.svc 3. Deploy Kibana $ vim kibana.yaml --- apiVersion: apps/v1 kind: Deployment metadata: name: kibana labels: app: kibana spec: replicas: 1 selector: matchLabels: app: kibana template: metadata: labels: app: kibana spec: containers: - name: kibana image: docker.elastic.co/kibana/kibana:7.2.0 resources: limits: cpu: 1000m requests: cpu: 100m env: - name: ELASTICSEARCH_URL value: http://elasticsearch:9200 ports: - containerPort: 5601 --- apiVersion: v1 kind: Service metadata: name: kibana labels: app: kibana spec: ports: - port: 5601 selector: app: kibana Apply this configuration: $ kubectl apply -f kibana.yaml 4. We then need to configure and ingress route for the kibana service as follows: $ vim kibana-ingress.yaml apiVersion: extensions/v1beta1 kind: Ingress metadata: annotations: kubernetes.io/tls-acme: "true" ingress.kubernetes.io/force-ssl-redirect: "true" name: kibana spec: rules: - host: kibana.computingpost.com http: paths: - backend: serviceName: kibana servicePort: 5601 path: / tls: - hosts: - kibana.computingpost.com secretName: ingress-secret // This can be created prior if using custom certs $ kubectl apply -f kibana-ingress.yaml Kibana service should now be accessible via https://kibana.computingpost.com/ Once we have this setup, We can proceed to deploy fluent Bit. Step 1: Deploy Service Account, Role and Role Binding Create a deployment file with the following contents: $ vim fluent-bit-role.yaml --- apiVersion: v1 kind: ServiceAccount metadata: name: fluent-bit --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: fluent-bit-read rules: - apiGroups: [""] resources: - namespaces - pods verbs: ["get", "list", "watch"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: fluent-bit-read roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: fluent-bit-read subjects: - kind: ServiceAccount name: fluent-bit namespace: default Apply deployment config by running the command below. kubectl apply -f fluent-bit-role.yaml Step 2: Deploy a Fluent Bit configMap This config map allows us to be able to configure our fluent Bit service accordingly. Here, we define the log parsing and routing for Fluent Bit. Change this configuration to match your needs. $ vim fluentbit-configmap.yaml
apiVersion: v1 kind: ConfigMap metadata: labels: k8s-app: fluent-bit name: fluent-bit-config data: filter-kubernetes.conf: | [FILTER] Name kubernetes Match * Kube_URL https://kubernetes.default.svc:443 Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token Kube_Tag_Prefix kube.var.log Merge_Log On Merge_Log_Key log_processed K8S-Logging.Parser On K8S-Logging.Exclude Off fluent-bit.conf: | [SERVICE] Flush 1 Log_Level info Daemon off Parsers_File parsers.conf HTTP_Server On HTTP_Listen 0.0.0.0 HTTP_Port 2020 @INCLUDE input-kubernetes.conf @INCLUDE filter-kubernetes.conf @INCLUDE output-elasticsearch.conf input-kubernetes.conf: | [INPUT] Name tail Tag * Path /var/log/*.log Parser json DB /var/log/flb_kube.db Mem_Buf_Limit 5MB Skip_Long_Lines On Refresh_Interval 10 output-elasticsearch.conf: | [OUTPUT] Name es Match * Host $FLUENT_ELASTICSEARCH_HOST Port $FLUENT_ELASTICSEARCH_PORT Logstash_Format On Replace_Dots On Retry_Limit False parsers.conf: | [PARSER] Name apache Format regex Regex ^(?[^ ]*) [^ ]* (?[^ ]*) \[(?[^\]]*)\] "(?\S+)(?: +(?[^\"]*?)(?: +\S*)?)?" (?[^ ]*) (?[^ ]*)(?: "(?[^\"]*)" "(?[^\"]*)")?$ Time_Key time Time_Format %d/%b/%Y:%H:%M:%S %z [PARSER] Name apache2 Format regex Regex ^(?[^ ]*) [^ ]* (?[^ ]*) \[(?[^\]]*)\] "(?\S+)(?: +(?[^ ]*) +\S*)?" (?[^ ]*) (?[^ ]*)(?: "(?[^\"]*)" "(?[^\"]*)")?$ Time_Key time Time_Format %d/%b/%Y:%H:%M:%S %z [PARSER] Name apache_error Format regex Regex ^\[[^ ]* (?[^\]]*)\] \[(?[^\]]*)\](?: \[pid (?[^\]]*)\])?( \[client (?[^\]]*)\])? (?.*)$ [PARSER] Name nginx Format regex Regex ^(?[^ ]*) (?[^ ]*) (?[^ ]*) \[(?[^\]]*)\] "(?\S+)(?: +(?[^\"]*?)(?: +\S*)?)?" (?[^ ]*) (?[^ ]*)(?: "(?[^\"]*)" "(?[^\"]*)")?$ Time_Key time Time_Format %d/%b/%Y:%H:%M:%S %z [PARSER] Name json Format json Time_Key time Time_Format %d/%b/%Y:%H:%M:%S %z [PARSER] Name docker Format json Time_Key time Time_Format %Y-%m-%d %H:%M:%S.%L Time_Keep On [PARSER] # http://rubular.com/r/tjUt3Awgg4 Name cri Format regex Regex ^(?[^ ]+) (?stdout|stderr) (?[^ ]*) (?.*)$ Time_Key time Time_Format %Y-%m-%dT%H:%M:%S.%L%z [PARSER] Name syslog Format regex Regex ^\(?[^ ]* 1,2[^ ]* [^ ]*) (?[^ ]*) (?[a-zA-Z0-9_\/\.\-]*)(?:\[(?[0-9]+)\])?(?:[^\:]*\:)? *(?.*)$ Time_Key time Time_Format %b %d %H:%M:%S kubectl apply -f fluentbit-configmap.yaml Step 3: Create a Persistent Volume Claim This is where we will write application logs. $ vim pvc.yaml apiVersion: v1 kind: PersistentVolumeClaim metadata: name: logs-pvc spec: accessModes: - ReadWriteMany storageClassName: cephfs #Change accordingly resources: requests: storage: 5Gi $ kubectl apply -f pvc.yaml Step 4: Deploy a kubernetes deployment using the config map in a file $ vim fluentbit-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: labels: k8s-app: fluent-bit-logging name: fluent-bit spec: replicas: 1 selector: matchLabels:
k8s-app: fluent-bit-logging template: metadata: annotations: prometheus.io/path: /api/v1/metrics/prometheus prometheus.io/port: "2020" prometheus.io/scrape: "true" labels: k8s-app: fluent-bit-logging kubernetes.io/cluster-service: "true" version: v1 spec: containers: - env: - name: FLUENT_ELASTICSEARCH_HOST value: elasticsearch - name: FLUENT_ELASTICSEARCH_PORT value: "9200" image: fluent/fluent-bit:1.5 imagePullPolicy: Always name: fluent-bit ports: - containerPort: 2020 protocol: TCP resources: terminationMessagePath: /dev/termination-log terminationMessagePolicy: File volumeMounts: - mountPath: /var/log name: varlog - mountPath: /fluent-bit/etc/ name: fluent-bit-config dnsPolicy: ClusterFirst restartPolicy: Always schedulerName: default-scheduler securityContext: serviceAccount: fluent-bit serviceAccountName: fluent-bit volumes: - name: varlog persistentVolumeClaim: claimName: logs-pvc - configMap: defaultMode: 420 name: fluent-bit-config name: fluent-bit-config Create objects by running the command below: $ kubectl apply -f fluentbit-deployment.yaml Step 5: Deploy an application Let’s test that our fluent bit service works as expected. We will use an test application that writes logs to our persistent volume. $ vim testpod.yaml apiVersion: v1 kind: Pod metadata: name: test-pod spec: containers: - name: app image: centos command: ["/bin/sh"] args: ["-c", "while true; do echo $(date -u) >> /var/log/app.log; sleep 5; done"] volumeMounts: - name: persistent-storage mountPath: /var/log volumes: - name: persistent-storage persistentVolumeClaim: claimName: logs-pvc Apply with the command: $ kubectl apply -f testpod.yaml Check if the pod is running. $ kubectl get pods You should see the following output: NAME READY STATUS RESTARTS AGE test-pod 1/1 Running 0 107s Once the pod is running, We can proceed to check if logs are sent to Elastic search. On Kibana, we will have to create an index as shown below. Click on “Management > Index Patterns> Create index pattern” Once the index has been created. Click on the discover icon to see if our logs are in place: See more guides on Kubernetes on our site.
0 notes
Text
1.1 Billion Taxi Rides: 108-core ClickHouse Cluster
ClickHouse is an open source, columnar-oriented database. It has a sweet spot where 100s of analysts can query non-rolled-up / cubed data quickly, even when tens of billions of new records a day are introduced. The infrastructure costs supporting such a system can come under $100K / year, and potentially half of that if usage permits. Yandex Metrica's ClickHouse installation at one point had 10s of trillions of records. Beyond Yandex, ClickHouse has also seen success recently at Bloomberg and CloudFlare.
Two years ago I benchmarked the database using a single machine and it came out as the fastest free database software I'd seen complete the benchmark. Since then, they've continued to add features including support for Kafka, HDFS and ZStandard compression. Last year they added support for stacking compression methods so that delta-of-delta compression became possible. When compressing time series data, gauge values can compress well with delta encoding but counters will do better with delta-of-delta encoding. Good compression has been a key to ClickHouse's performance.
ClickHouse is made up of 170K lines of C++ code when excluding 3rd-party libraries and is one of the smaller distributed database codebases. For contrast, SQLite doesn't support distribution and has 235K lines of C code. As of this writing, 207 engineers have contributed to ClickHouse and the rate of commits has been accelerating for some time.
In March of 2017, ClickHouse began maintaining a CHANGELOG as an easy way to keep track of developments. They've also broken up the monolithic documentation file into a hierarchy of Markdown-based files. Issues and features for the software are tracked via GitHub and overall this software has become much more approachable in the past few years.
In this post I'm going to take a look at ClickHouse's clustered performance on AWS EC2 using 36-core CPUs and NVMe storage.
Launching an AWS EC2 Cluster
I'll be using three c5d.9xlarge EC2 instances for this post. They each contain 36 vCPUs, 72 GB of RAM, 900 GB of NVMe SSD storage and support 10 Gigabit networking. They cost $1.962 / hour each in eu-west-1 when launched on-demand. I'll be using Ubuntu Server 16.04 LTS for the operating system.
The firewall is setup so each machine can communicate between one another without restrictions but only my IPv4 address is white-listed to SSH into the cluster.
NVMe Storage, Up and Running
On each of the servers I'll create an EXT4-formatted file system on the NVMe storage for ClickHouse to work off of.
$ sudo mkfs -t ext4 /dev/nvme1n1 $ sudo mkdir /ch $ sudo mount /dev/nvme1n1 /ch
Once that's setup you can see its mount point and that 783 GB of capacity is available on each of the systems.
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT loop0 7:0 0 87.9M 1 loop /snap/core/5742 loop1 7:1 0 16.5M 1 loop /snap/amazon-ssm-agent/784 nvme0n1 259:1 0 8G 0 disk └─nvme0n1p1 259:2 0 8G 0 part / nvme1n1 259:0 0 838.2G 0 disk /ch
Filesystem Size Used Avail Use% Mounted on udev 35G 0 35G 0% /dev tmpfs 6.9G 8.8M 6.9G 1% /run /dev/nvme0n1p1 7.7G 967M 6.8G 13% / tmpfs 35G 0 35G 0% /dev/shm tmpfs 5.0M 0 5.0M 0% /run/lock tmpfs 35G 0 35G 0% /sys/fs/cgroup /dev/loop0 88M 88M 0 100% /snap/core/5742 /dev/loop1 17M 17M 0 100% /snap/amazon-ssm-agent/784 tmpfs 6.9G 0 6.9G 0% /run/user/1000 /dev/nvme1n1 825G 73M 783G 1% /ch
The dataset I'll be using in this benchmark is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put this dataset together. They're stored on AWS S3 so I'll configure the AWS CLI with my access and secret keys.
$ sudo apt update $ sudo apt install awscli $ aws configure
I'll set the client's concurrent requests limit to 100 so the files download quicker than they would with stock settings.
$ aws configure set \ default.s3.max_concurrent_requests \ 100
I'll download taxi ride dataset off of AWS S3 and store it on the NVMe drive on the first server. This dataset is ~104 GB when in GZIP-compressed, CSV format.
$ sudo mkdir -p /ch/csv $ sudo chown -R ubuntu /ch/csv $ aws s3 sync s3://<bucket>/csv /ch/csv
Installing ClickHouse
I'll first install a few software installation utilities for Java 8.
$ sudo apt install \ software-properties-common \ python-software-properties
I'll then install Oracle's Java 8 distribution as its needed to run Apache ZooKeeper, a prerequisite of a distributed ClickHouse setup.
$ sudo add-apt-repository ppa:webupd8team/java $ sudo apt update $ sudo apt install oracle-java8-installer
I'll then use Ubuntu's package management to install ClickHouse 18.16.1 and ZooKeeper on all three machines.
$ sudo apt-key adv \ --keyserver hkp://keyserver.ubuntu.com:80 \ --recv E0C56BD4 $ echo "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" | \ sudo tee /etc/apt/sources.list.d/clickhouse.list $ sudo apt-get update $ sudo apt install \ clickhouse-client \ clickhouse-server \ zookeeperd
I'll create a data directory for ClickHouse as well as some configuration overrides on all three servers.
$ sudo mkdir /ch/clickhouse $ sudo chown -R clickhouse /ch/clickhouse $ sudo mkdir -p /etc/clickhouse-server/conf.d $ sudo vi /etc/clickhouse-server/conf.d/taxis.conf
These are the configuration overrides I'll be using.
<?xml version="1.0"?> <yandex> <listen_host>0.0.0.0</listen_host> <path>/ch/clickhouse/</path> <remote_servers> <perftest_1shards_3replicas> <shard> <replica> <host>172.30.2.200</host> <port>9000</port> </replica> <replica> <host>172.30.2.214</host> <port>9000</port> </replica> <replica> <host>172.30.2.174</host> <port>9000</port> </replica> </shard> </perftest_1shards_3replicas> </remote_servers> <zookeeper-servers> <node> <host>172.30.2.200</host> <port>2181</port> </node> <node> <host>172.30.2.214</host> <port>2181</port> </node> <node> <host>172.30.2.174</host> <port>2181</port> </node> </zookeeper-servers> <macros> <shard>01</shard> <replica>01</replica> </macros> </yandex>
I'll then launch ZooKeeper and the ClickHouse Server on all three machines.
$ sudo /etc/init.d/zookeeper start $ sudo service clickhouse-server start
Loading Data into ClickHouse
On the first server I'll create a trips table that will hold the taxi trips dataset using the Log engine.
$ clickhouse-client --host=0.0.0.0
CREATE TABLE trips ( trip_id UInt32, vendor_id String, pickup_datetime DateTime, dropoff_datetime Nullable(DateTime), store_and_fwd_flag Nullable(FixedString(1)), rate_code_id Nullable(UInt8), pickup_longitude Nullable(Float64), pickup_latitude Nullable(Float64), dropoff_longitude Nullable(Float64), dropoff_latitude Nullable(Float64), passenger_count Nullable(UInt8), trip_distance Nullable(Float64), fare_amount Nullable(Float32), extra Nullable(Float32), mta_tax Nullable(Float32), tip_amount Nullable(Float32), tolls_amount Nullable(Float32), ehail_fee Nullable(Float32), improvement_surcharge Nullable(Float32), total_amount Nullable(Float32), payment_type Nullable(String), trip_type Nullable(UInt8), pickup Nullable(String), dropoff Nullable(String), cab_type Nullable(String), precipitation Nullable(Int8), snow_depth Nullable(Int8), snowfall Nullable(Int8), max_temperature Nullable(Int8), min_temperature Nullable(Int8), average_wind_speed Nullable(Int8), pickup_nyct2010_gid Nullable(Int8), pickup_ctlabel Nullable(String), pickup_borocode Nullable(Int8), pickup_boroname Nullable(String), pickup_ct2010 Nullable(String), pickup_boroct2010 Nullable(String), pickup_cdeligibil Nullable(FixedString(1)), pickup_ntacode Nullable(String), pickup_ntaname Nullable(String), pickup_puma Nullable(String), dropoff_nyct2010_gid Nullable(UInt8), dropoff_ctlabel Nullable(String), dropoff_borocode Nullable(UInt8), dropoff_boroname Nullable(String), dropoff_ct2010 Nullable(String), dropoff_boroct2010 Nullable(String), dropoff_cdeligibil Nullable(String), dropoff_ntacode Nullable(String), dropoff_ntaname Nullable(String), dropoff_puma Nullable(String) ) ENGINE = Log;
I'll then decompress and load each of the CSV files into the trips table. The following completed in 55 minutes and 10 seconds. The data directory was 134 GB in size following this operation.
$ time (for FILENAME in /ch/csv/trips_x*.csv.gz; do gunzip -c $FILENAME | \ clickhouse-client \ --host=0.0.0.0 \ --query="INSERT INTO trips FORMAT CSV" done)
The import rate was 155 MB/s of uncompressed CSV content. I suspect this was due to a bottleneck with GZIP decompression. It might have been quicker to decompress all the gzip files in parallel using xargs and then load in the decompressed data. Below is what glances was reporting during the CSV import process.
$ sudo apt install glances $ sudo glances
ip-172-30-2-200 (Ubuntu 16.04 64bit / Linux 4.4.0-1072-aws) Uptime: 0:11:42 CPU 8.2% nice: 0.0% LOAD 36-core MEM 9.8% active: 5.20G SWAP 0.0% user: 6.0% irq: 0.0% 1 min: 2.24 total: 68.7G inactive: 61.0G total: 0 system: 0.9% iowait: 1.3% 5 min: 1.83 used: 6.71G buffers: 66.4M used: 0 idle: 91.8% steal: 0.0% 15 min: 1.01 free: 62.0G cached: 61.6G free: 0 NETWORK Rx/s Tx/s TASKS 370 (507 thr), 2 run, 368 slp, 0 oth sorted automatically by cpu_percent, flat view ens5 136b 2Kb lo 343Mb 343Mb CPU% MEM% VIRT RES PID USER NI S TIME+ IOR/s IOW/s Command 100.4 1.5 1.65G 1.06G 9909 ubuntu 0 S 1:01.33 0 0 clickhouse-client --host=0.0.0.0 --query=INSERT INTO trips FORMAT CSV DISK I/O R/s W/s 85.1 0.0 4.65M 708K 9908 ubuntu 0 R 0:50.60 32M 0 gzip -d -c /ch/csv/trips_xac.csv.gz loop0 0 0 54.9 5.1 8.14G 3.49G 8091 clickhous 0 S 1:44.23 0 45M /usr/bin/clickhouse-server --config=/etc/clickhouse-server/config.xml loop1 0 0 4.5 0.0 0 0 319 root 0 S 0:07.50 1K 0 kworker/u72:2 nvme0n1 0 3K 2.3 0.0 91.1M 28.9M 9912 root 0 R 0:01.56 0 0 /usr/bin/python3 /usr/bin/glances nvme0n1p1 0 3K 0.3 0.0 0 0 960 root -20 S 0:00.10 0 0 kworker/28:1H nvme1n1 32.1M 495M 0.3 0.0 0 0 1058 root -20 S 0:00.90 0 0 kworker/23:1H
I'll first free up some space on the NVMe drive by removing the source CSV files before continuing.
Converting into Columnar Form
ClickHouse's Log engine will store data in a row-centric format. In order to query the data faster I'll convert it into a columnar-centric format using the MergeTree engine.
$ clickhouse-client --host=0.0.0.0
The following completed in 43 minutes and 56 seconds. The data directory was 347 GB in size following this operation.
CREATE TABLE trips_mergetree ENGINE = MergeTree(pickup_date, pickup_datetime, 8192) AS SELECT trip_id, CAST(vendor_id AS Enum8('1' = 1, '2' = 2, 'CMT' = 3, 'VTS' = 4, 'DDS' = 5, 'B02512' = 10, 'B02598' = 11, 'B02617' = 12, 'B02682' = 13, 'B02764' = 14)) AS vendor_id, toDate(pickup_datetime) AS pickup_date, ifNull(pickup_datetime, toDateTime(0)) AS pickup_datetime, toDate(dropoff_datetime) AS dropoff_date, ifNull(dropoff_datetime, toDateTime(0)) AS dropoff_datetime, assumeNotNull(store_and_fwd_flag) AS store_and_fwd_flag, assumeNotNull(rate_code_id) AS rate_code_id, assumeNotNull(pickup_longitude) AS pickup_longitude, assumeNotNull(pickup_latitude) AS pickup_latitude, assumeNotNull(dropoff_longitude) AS dropoff_longitude, assumeNotNull(dropoff_latitude) AS dropoff_latitude, assumeNotNull(passenger_count) AS passenger_count, assumeNotNull(trip_distance) AS trip_distance, assumeNotNull(fare_amount) AS fare_amount, assumeNotNull(extra) AS extra, assumeNotNull(mta_tax) AS mta_tax, assumeNotNull(tip_amount) AS tip_amount, assumeNotNull(tolls_amount) AS tolls_amount, assumeNotNull(ehail_fee) AS ehail_fee, assumeNotNull(improvement_surcharge) AS improvement_surcharge, assumeNotNull(total_amount) AS total_amount, assumeNotNull(payment_type) AS payment_type_, assumeNotNull(trip_type) AS trip_type, pickup AS pickup, pickup AS dropoff, CAST(assumeNotNull(cab_type) AS Enum8('yellow' = 1, 'green' = 2)) AS cab_type, precipitation AS precipitation, snow_depth AS snow_depth, snowfall AS snowfall, max_temperature AS max_temperature, min_temperature AS min_temperature, average_wind_speed AS average_wind_speed, pickup_nyct2010_gid AS pickup_nyct2010_gid, pickup_ctlabel AS pickup_ctlabel, pickup_borocode AS pickup_borocode, pickup_boroname AS pickup_boroname, pickup_ct2010 AS pickup_ct2010, pickup_boroct2010 AS pickup_boroct2010, pickup_cdeligibil AS pickup_cdeligibil, pickup_ntacode AS pickup_ntacode, pickup_ntaname AS pickup_ntaname, pickup_puma AS pickup_puma, dropoff_nyct2010_gid AS dropoff_nyct2010_gid, dropoff_ctlabel AS dropoff_ctlabel, dropoff_borocode AS dropoff_borocode, dropoff_boroname AS dropoff_boroname, dropoff_ct2010 AS dropoff_ct2010, dropoff_boroct2010 AS dropoff_boroct2010, dropoff_cdeligibil AS dropoff_cdeligibil, dropoff_ntacode AS dropoff_ntacode, dropoff_ntaname AS dropoff_ntaname, dropoff_puma AS dropoff_puma FROM trips;
This is what glances looked like during the operation:
ip-172-30-2-200 (Ubuntu 16.04 64bit / Linux 4.4.0-1072-aws) Uptime: 1:06:09 CPU 10.3% nice: 0.0% LOAD 36-core MEM 16.1% active: 13.3G SWAP 0.0% user: 7.9% irq: 0.0% 1 min: 1.87 total: 68.7G inactive: 52.8G total: 0 system: 1.6% iowait: 0.8% 5 min: 1.76 used: 11.1G buffers: 71.8M used: 0 idle: 89.7% steal: 0.0% 15 min: 1.95 free: 57.6G cached: 57.2G free: 0 NETWORK Rx/s Tx/s TASKS 367 (523 thr), 1 run, 366 slp, 0 oth sorted automatically by cpu_percent, flat view ens5 1Kb 8Kb lo 2Kb 2Kb CPU% MEM% VIRT RES PID USER NI S TIME+ IOR/s IOW/s Command 241.9 12.8 20.7G 8.78G 8091 clickhous 0 S 30:36.73 34M 125M /usr/bin/clickhouse-server --config=/etc/clickhouse-server/config.xml DISK I/O R/s W/s 2.6 0.0 90.4M 28.3M 9948 root 0 R 1:18.53 0 0 /usr/bin/python3 /usr/bin/glances loop0 0 0 1.3 0.0 0 0 203 root 0 S 0:09.82 0 0 kswapd0 loop1 0 0 0.3 0.1 315M 61.3M 15701 ubuntu 0 S 0:00.40 0 0 clickhouse-client --host=0.0.0.0 nvme0n1 0 3K 0.3 0.0 0 0 7 root 0 S 0:00.83 0 0 rcu_sched nvme0n1p1 0 3K 0.0 0.0 0 0 142 root 0 S 0:00.22 0 0 migration/27 nvme1n1 25.8M 330M 0.0 0.0 59.7M 1.79M 2764 ubuntu 0 S 0:00.00 0 0 (sd-pam)
In the last benchmark several columns were cast and re-computed. I found a number of those functions no longer worked properly on this dataset. In order to get around this I removed the offending functions and loaded in the data without casting into more granular data types.
Distributing Data Across the Cluster
I'll be distributing the data across all three nodes in the cluster. To start, I'll create the table below on all three machines.
$ clickhouse-client --host=0.0.0.0
CREATE TABLE trips_mergetree_third ( trip_id UInt32, vendor_id String, pickup_date Date, pickup_datetime DateTime, dropoff_date Date, dropoff_datetime Nullable(DateTime), store_and_fwd_flag Nullable(FixedString(1)), rate_code_id Nullable(UInt8), pickup_longitude Nullable(Float64), pickup_latitude Nullable(Float64), dropoff_longitude Nullable(Float64), dropoff_latitude Nullable(Float64), passenger_count Nullable(UInt8), trip_distance Nullable(Float64), fare_amount Nullable(Float32), extra Nullable(Float32), mta_tax Nullable(Float32), tip_amount Nullable(Float32), tolls_amount Nullable(Float32), ehail_fee Nullable(Float32), improvement_surcharge Nullable(Float32), total_amount Nullable(Float32), payment_type Nullable(String), trip_type Nullable(UInt8), pickup Nullable(String), dropoff Nullable(String), cab_type Nullable(String), precipitation Nullable(Int8), snow_depth Nullable(Int8), snowfall Nullable(Int8), max_temperature Nullable(Int8), min_temperature Nullable(Int8), average_wind_speed Nullable(Int8), pickup_nyct2010_gid Nullable(Int8), pickup_ctlabel Nullable(String), pickup_borocode Nullable(Int8), pickup_boroname Nullable(String), pickup_ct2010 Nullable(String), pickup_boroct2010 Nullable(String), pickup_cdeligibil Nullable(FixedString(1)), pickup_ntacode Nullable(String), pickup_ntaname Nullable(String), pickup_puma Nullable(String), dropoff_nyct2010_gid Nullable(UInt8), dropoff_ctlabel Nullable(String), dropoff_borocode Nullable(UInt8), dropoff_boroname Nullable(String), dropoff_ct2010 Nullable(String), dropoff_boroct2010 Nullable(String), dropoff_cdeligibil Nullable(String), dropoff_ntacode Nullable(String), dropoff_ntaname Nullable(String), dropoff_puma Nullable(String) ) ENGINE = MergeTree(pickup_date, pickup_datetime, 8192);
I'll then make sure the first server can see all three nodes in the cluster.
SELECT * FROM system.clusters WHERE cluster = 'perftest_1shards_3replicas' FORMAT Vertical;
Row 1: ────── cluster: perftest_1shards_3replicas shard_num: 1 shard_weight: 1 replica_num: 1 host_name: 172.30.2.200 host_address: 172.30.2.200 port: 9000 is_local: 1 user: default default_database: Row 2: ────── cluster: perftest_1shards_3replicas shard_num: 1 shard_weight: 1 replica_num: 2 host_name: 172.30.2.214 host_address: 172.30.2.214 port: 9000 is_local: 1 user: default default_database: Row 3: ────── cluster: perftest_1shards_3replicas shard_num: 1 shard_weight: 1 replica_num: 3 host_name: 172.30.2.174 host_address: 172.30.2.174 port: 9000 is_local: 1 user: default default_database:
I'll then define a new table on the first server that's based on the trips_mergetree_third schema and uses the Distributed engine.
CREATE TABLE trips_mergetree_x3 AS trips_mergetree_third ENGINE = Distributed(perftest_1shards_3replicas, default, trips_mergetree_third, rand());
I'll then copy the data out of the MergeTree-based table and onto all three servers. The following completed in 58 minutes and 52 seconds.
INSERT INTO trips_mergetree_x3 SELECT * FROM trips_mergetree;
Following the above operation I gave ClickHouse 15 minutes to recede from its storage high-water mark. The data directories ended up being 421 GB, 144 GB and 144 GB in size respectively on each of the three servers.
ClickHouse Cluster Benchmark
The following were the fastest times I saw after running each query multiple times on the trips_mergetree_x3 table.
$ clickhouse-client --host=0.0.0.0
The following completed in 2.502 seconds.
SELECT cab_type, count(*) FROM trips_mergetree_x3 GROUP BY cab_type;
The following completed in 1.880 seconds.
SELECT passenger_count, avg(total_amount) FROM trips_mergetree_x3 GROUP BY passenger_count;
The following completed in 1.609 seconds.
SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree_x3 GROUP BY passenger_count, year;
The following completed in 2.681 seconds.
SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*) FROM trips_mergetree_x3 GROUP BY passenger_count, year, distance ORDER BY year, count(*) DESC;
While running query 1 I could see the first server's CPU was at ~50% utilisation while the other machines remained somewhat idle. This is a snippet from Glances:
CPU% MEM% VIRT RES PID USER NI S TIME+ IOR/s IOW/s Command 1508.2 1.9 50.7G 1.32G 8091 clickhous 0 S 0:54.98 0 3K /usr/bin/clickhouse-server --config=/etc/clickhouse-server/config.xml
The above struck me as odd. I was expecting a single query to speed up as I scaled out horizontally. To add to that I was expecting each queries to take longer than their previous siblings.
I decided to run the same queries on the MergeTree-based table which sits solely on the first server.
ClickHouse Single-Node Benchmark
The following were the fastest times I saw after running each query multiple times on the trips_mergetree table.
The following completed in 0.241 seconds.
SELECT cab_type, count(*) FROM trips_mergetree GROUP BY cab_type;
The following completed in 0.826 seconds.
SELECT passenger_count, avg(total_amount) FROM trips_mergetree GROUP BY passenger_count;
The following completed in 1.209 seconds.
SELECT passenger_count, toYear(pickup_date) AS year, count(*) FROM trips_mergetree GROUP BY passenger_count, year;
The following completed in 1.781 seconds.
SELECT passenger_count, toYear(pickup_date) AS year, round(trip_distance) AS distance, count(*) FROM trips_mergetree GROUP BY passenger_count, year, distance ORDER BY year, count(*) DESC;
Thoughts on the Results
This is the first time a free CPU-based database has managed to out-perform a GPU-based database in my benchmarks. That GPU database has since undergone two revisions but nonetheless, the performance ClickHouse has found on a single node is very impressive.
That being said, there is an order of magnitude of overhead when running Query 1 on the distributed engine. I'm hoping I've missed something in my research for this post because it would be good to see query times drop as I add more nodes to the cluster.
I've come across many setups where a single query won't be able to consume an entire cluster's resources but can run well concurrently with other queries. Given the single-node performance I'd consider ClickHouse for this sort of workload.
It would be nice to see ClickHouse evolve in such a way that storage and compute could somehow be disconnected so that they could scale independently. The HDFS support that has been added in the last year could be a step towards this. On the compute side, if a single query can be sped up as more nodes are added to the cluster then the future for this software will be very bright.
Thank you for taking the time to read this post. I offer consulting, architecture and hands-on development services to clients in North America & Europe. If you'd like to discuss how my offerings can help your business please contact me via
LinkedIn
.
DataTau published first on DataTau
0 notes
Text
How to install Apache Kafka on Ubuntu
How to install Apache Kafka on Ubuntu
To install Apache Kafka on Ubuntu, following are the prerequisites required :
Java
Zookeeper
And finally, we shall setup Apache Kafka and run it on the Ubuntu machine.
Install Apache Kafka in Ubuntu
Install Java
Open a terminal and run the following command :
kafkauser@tutorialkart:~$ sudo apt-get install default-jdk
To verify the installation of java, run the following command in the…
View On WordPress
0 notes