Hadoop meets Blockchain: Trust your (Big) Data

At a simple level, Blockchains solve a trust problem. Increasingly, companies are relying on third parties to help drive brand recognition and gain consumer trust, this includes trusting third party data.  For these companies to succeed it is vital that the data they receive is trustworthy and accurate. Each organization involved needs to trust that data entered into the system is from a verifiable source and they need guarantees that this data is identical for all participants in the network.  Many systems have been developed to solve this problem of trust. At phData we believe that Blockchain will be the backbone of a trusted economy where our customers reliably interact with their distributors and other 3rd parties. As our customers begin using distributed ledgers to interact with their suppliers and customers, these distributed ledgers will be a critical operational data source that companies need to ingest and analyze as part of their analytics and machine learning initiatives.

At phData we thrive on empowering businesses through automation and standardization.  Throughout our journey we have standardized a set of tools that lets businesses focus less on Hadoop basics and more on delivering business value.  This allows our customers to leverage our intimate knowledge of these tools to solve a wide array of business problems. However, with the advent of Blockchain we face a new standardization problem. In this post, we will present a new standard for the ingestion and analysis of blockchain data similar to the way we’ve standardized data pipelining with Pipewrench.

Why Create a Standard for Blockchain Analytics?

While Big Data is still a relatively young field in the tech ecosystem, it has already gone through its peak hype cycle.  On the flip side Blockchain is still incredibly early in its hype cycle and has yet to see wide enterprise adoption (even though you may know it for its financial hype cycle, which is irrelevant to this conversation and only serves as confirmation that this technology is not going away).  Google (Bitcoin BigQuery, Ethereum BigQuery), Amazon (BaaS), and Microsoft (Workbench) are all in the news for offering Blockchain services, signaling a need to plan for the future of enterprise adoption within this growing field.

Integration of disparate datasets is a business problem the Hadoop ecosystem is well equipped to handle since it handles a vast array of use cases and inspires further investment by connecting the enterprise through data solutions.  Hadoop has been breaking down data silos for years across the enterprise and the distributed ledger use case is no different. Its simply a new data source for the Hadoop platform to aggregate data from, itching to be integrated with enterprise data and drive enterprise efficiency.  By leveraging phData’s existing tools and automated monitoring your business can achieve insights faster and support these solutions across multiple environments.

Trust Visualized

In the big data industry there are three frequent problems confronting enterprises in the use third party generated data: data security, data reliability, and data sharing.  Blockchain solves the problem of security via mathematically verifiable cryptographic signatures. Also, through the use of merkle trees, the Blockchain is immutable so once data is added to the blockchain it cannot be altered.  This immutability property makes data derived from Blockchain highly reliable. In turn this improves data quality across environments because every client has a shared single source of truth.  Finally, blockchain’s ability to securely and immutably store data in a public way removes the possibility of tampering or manipulation by bad actors. Combining these principals means instead of organizations setting up and sharing databases between separate entities it’s now possible to share the public ledger as the source of trust via open source code and cryptography.  This ensures 3rd parties can trust what each other are reporting as accurate due to the dynamics of repeated games to influence actions by rewarding good actors and punishing bad actors via reputation score or financial stake.  Meaning to participate in this market you will be required to maintain a reputation that can be affected based on the outcome of the process or stake capital in escrow to be released on completion of the process where based on the outcome could benefit in profit or loss depending on a positive or negative outcome.

To provide a more concrete example, let’s visualize a typical supply chain.  Supply chains are high touch ecosystems and require trust every step of the way to ensure safe and dependable delivery of goods.  By introducing Blockchain into the equation it is now possible for 3rd parties to gain insight into where supply chains break down or even ensure public healths safety by tracking and stopping sources of product contamination.

Source: https://resolvesp.com/wp-content/uploads/2017/05/sas-04-1.png

In the above example data is entered onto the blockchain at every step of the process. If, say, the distributor is 3 days late delivering products to the retailer they cannot falsify the date they received the product due to the blockchain’s secure and immutable data storage properties.  Another example would be verification that goods were kept at a safe temperature during transportation via tamper proof IoT devices attached to the product to ensure food safety regulations are met.

Gaining trust and insight into our data will provide increasing possibilities for individuals and organizations to profit from data sharing since the Blockchain will enable the rise of trusted data marketplaces.  While the possibilities are exciting we still need to approach this from a Hadoop solution and determine how we can make sense of the data available. The first step in this process is identifying how to reliably ingest data from the Blockchain we are interested in (or potentially many Blockchains).  There are many solutions to this problem and we think the simplest solution is typically the best, enter the JSON-RPC protocol which standardizes API communication via simple RPC interface and will be demonstrated how to use in the example provided.  This communication protocol allows us to standardize requests across Blockchains due to its wide use and can be accessed locally or remotely from other nodes available across the network.  Since all nodes in the network must come to a consensus on the truth this data must be the same on all nodes and this communication is simple enough that we have tooling to solve for it.

Why Collect Blockchain Data?

Blockchain information is quickly becoming part of the public domain as demonstrated by the introduction of Bitcoin and Ethereum data being widely available to the public via BigQuery (BigQuery Bitcoin; BigQuery Ethereum).  Giving a wider audience the tools necessary to do analysis and analytics on top of the data generated on the Blockchain will provide greater insight and ideas as the ecosystem matures.  The addition of these datasets also speak to the value proposition as well as data standards that Blockchains help drive by ensuring all information on the network adheres to strict standards.  This standardization improves data quality.

Empowering individuals to do analytics on top of Blockchain data without the need to maintain full nodes is a great start for the public domain.  So is having the ability to identify where products originate from as well as tracking all the touch points in between until it’s in the hands of the consumer.  These are powerful tools but they do little to solve the integration problem of legacy enterprise ecosystems. You may have heard that Blockchain technologies could eventually disrupt transaction settlement, legal procedures, supply chain management, purchase orders (POs), financial balance sheets, accounting, and even data science.  Currently these promises are far from being a corporate reality. In the current state, enterprises will be required to integrate data generated on the Blockchain with the rest of the enterprise, ensuring the rest of the business integrates with the version of truth as viewed on the Blockchain. This will ensure that businesses can still function and workers continue getting paid regardless of the underlying ecosystem it is built on top of.  This means we need to define standards and automation to ensure repeatable ingestion from our new data source.

Collecting and Inspecting Blockchain Data

Before we start using Blockchain to share data we need to define an ingest protocol to retrieve new information and integrate it with data already present in the Hadoop environment.  Here we will discuss how to start this process with some simple examples. While a production deployment would most likely require the maintenance of a full node, preferably inside a Docker container, let’s attempt to ingest data via tooling we know how to use.  First we need a data source, for this simple example we are going to use Ethereum as the Blockchain of choice. Since we aren’t maintaining our own node in this example we want to develop something that will be portable but publicly available, if you were running a full node you would most likely be interacting with it via Geth, the Go Ethereum client via Ethereum’s defined JSON-RPC.  The first request would be to fetch the most recent block generated by the network: https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_blocknumber

 

// Request
curl -X POST --data '{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}'

// Result
{
  "id":1,
  "jsonrpc": "2.0",
  "result": "0x5f3dd3" // 6241747
}

 

Now that we have the number of the most recently generated block we can request the details of the block with the following JSON-RPC request: https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getblockbynumber

 

// Request
curl -X POST --data '{"jsonrpc":"2.0","method":"eth_getBlockByNumber","params":["0x5f3dd3", true],"id":2}'




// Result

{

    "jsonrpc": "2.0",

    "id": 1,

    "result": {

        "difficulty": "0xbfb5cbb4f1c34",

        "extraData": "0x65746865726d696e652d657535",

        "gasLimit": "0x7a121d",

        "gasUsed": "0x794153",

        "hash": "0xa75b171eb586c7567b8c1c0617f01d9803b1faf77a06afd5eb33306b40767460",

        "logsBloom":  ...,

        "miner": "0xea674fdde714fd979de3edf0f56aa9716b898ec8",

        "mixHash": "0xa09b38efc6aa3d74635c21ca5ae44b5b7b4e1bd445c3d602162edecb3fc03d5b",

        "nonce": "0xd97fbe7c1672ee17",

        "number": "0x5f3dd3",

        "parentHash": "0x9dc31ebf4887c75163d8003154dd2a29f69de64655ae4c2e75e556dabc57ed2a",

        "receiptsRoot": "0x54b2b10eadc53f57b807f3364bf41d7ba70d3332c321ecfec1314606fc5ba00f",

        "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347",

        "size": "0x8855",

        "stateRoot": "0x755c1d82e803af1f22d6ddba91bea12892fde0d177cc2c209ba5e3bb463c54a5",

        "timestamp": "0x5b8825bd",

        "totalDifficulty": "0x157406099fae63bef8b",

        "transactions": [

            {

                "blockHash": "0xa75b171eb586c7567b8c1c0617f01d9803b1faf77a06afd5eb33306b40767460",

                "blockNumber": "0x5f3dd3",

                "from": "0xeb6d43fe241fb2320b5a3c9be9cdfd4dd8226451",

                "gas": "0x15f90",

                "gasPrice": "0x1ef76af200",

                "hash": "0x90f6e26ca6805e1e5562ea836a2cf9509c3b8ed7339595ede7ae8ac60e9a68a4",

                "input": "0x",

                "nonce": "0xf233",

                "r": "0xa46c18c81149a8665ac49d9f539814f7b749490e3b84aaafb29dd6b4a180fe41",

                "s": "0x3aa10b4a15aa1d41d08d3422ae19a2ee1bf4a9c7c166a0bb17f9c637934fed73",

                "to": "0xcebecbc4c763f2938e7c50263b6c94f9f85283a2",

                "transactionIndex": "0x0",

                "v": "0x26",

                "value": "0x6de97e09bd18000"

            },

            ...,

            {

                "blockHash": "0xa75b171eb586c7567b8c1c0617f01d9803b1faf77a06afd5eb33306b40767460",

                "blockNumber": "0x5f3dd3",

                "from": "0x019221aef1f8b4687291eb587c496a30623bc6b9",

                "gas": "0x1aea9",

                "gasPrice": "0x9502f900",

                "hash": "0x78d697e4c6901d1bf917bf2cc986f2c75bcb4bdeb68c3d931b2f748ab12dee8c",

                "input": "0xa9059cbb0000000000000000000000000818cd4a71e0b1ffd672029ae50ec86129e60c2e00000000000000000000000000000000000000000000000040cfb79d1b2a5000",

                "nonce": "0x302",

                "r": "0x2600f40e2cbdfadf0e35e223ac6604ed8e8f86c6ae3be4384d3b6f7205913c61",

                "s": "0x3fb6613682a1913de5bb08e8ed19783a7dffb9a2a8f40204df1eba41790726c3",

                "to": "0x5ae655088e79fa0ced7b472d3bacd215ef796ccc",

                "transactionIndex": "0x39",

                "v": "0x1b",

                "value": "0x0"

            }

        ],

        "transactionsRoot": "0xc362eb2780891407058657a37acfcec177ce14691eddcc1fc22101f77a314d13",

        "uncles": []

    }

}

There is a lot to take in as to what is in this response.  At a high level the data involved tells us the parent block (forming the reverse linked list), the time the block was generated, how difficult the block was to mine, the amount of transaction fees collected (gas), the miner who solved the block (rewarded the transaction fees), and a list of transactions processed in the generated block.  The transactions include information that tells us the parties involved (to and from addresses), the price of the transaction, the value of the asset transferred, and the parameters sent to interact with smart contracts (Blockchains term for functions).

Standardize the Ingest Pipeline – First Iteration

At this point we have the absolute basics of what we need to start collecting data as it is generated as long as we keep retrieving the newest generated block.  Since we are not running a full node we will go to an external party to do this request, for this example we will be using infura.io which has the requests we used above as documented by the following:

We have gathered our base layer for data ingestion, now we need a tool to help us standardize this approach: enter the StreamSets Data Collector.  StreamSets Data Collector is the tool phData has standardized on to provide scalable solutions for data ingestion into the Enterprise Data Hub. It provides an easy to understand visualization of your ingestion pipeline as well as the ability to automate deployments via a Python SDK which get translated into pipeline visualizations.  We now have the pieces to start our Blockchain ingestion which looks like the following:

 

For step 1, we are requesting the most recently generated block number with the following configurations:

 

For step 2, we are passing the block number as a parameter to the next request to get the entire block.  NOTE: we are passing “true” in the list of params so that we get the entire details of each transaction, if you only wanted the transaction’s hash you can set this value to “false” and have another step that retrieves only the transactions you desire.

 

At this point the processing forks the information into two locations.  The first location stores the full fidelity of data into HDFS so that further analysis can be done and moved into the typical information architecture shared by all of your existing data pipelines.  The second location pivots the transactions in the block into individual records. In our example the block we retrieved had 59 transactions which when pivoted turns into 59 records to be upsert’ed into our Kudu table for further analytics.  The benefit of doing upserts into Kudu is that the primary key for the transaction will not change so if data is ingested again this information will remain unique.

Standardize the Ingest Pipeline – Second Iteration

The above pipeline helps to visualize each step of the process as well as standardize how we will store data in each data storage engine.  The savvy reader will realize there are a couple of gaps in the above solution and that there is room for improvement. The main issue with the first iteration of our pipeline arises when the platform or enterprise requires system downtime due to patching or regular system maintenance.  This implies the pipeline(s) will be required to pause or stop during the maintenance window and resume from the last block consumed. The first iteration does not take this into consideration and introduces the possibility of missing generated blocks since it will only retrieve the most recently generate block.  Utilizing the custom phData Blockchain origin we can solve both of these issues by (1) looking for missing blocks and (2) catching up on missed blocks by ingesting block numbers missed.

Standardize the Ingest Pipeline – Third Iteration

The first two iterations of this pipeline have focused on collecting information from a centralized location via RPC calls.  This solution works but does not include ERC20 transfers (token transactions) and is less flexible when needing to ingest large quantities of data, including ingesting all data since genesis.  To gain control over your infrastructure it would be best to maintain your own Blockchain client due to its ability to automatically connect to neighbors (running your own client better handles synchronization as network participants drop out of and into the network).  Meaning, as long as there is an internet connection, the client software will negotiate the network to ensure your system is always up to date by receiving the next block via a distributed peer to peer network. This minimizes downtime since it removes the requirement of relying on a centralized third party.

To run a full node its recommended to run inside a Docker container to ensure its isolated from other resources in the enterprise.  By using Docker we can take advantage of its built in features including, upgradability, transparency, modularity, and environment parity.  To get started we have included links to the Ethereum resources for running its client inside of Docker containers:

Once the Blockchain has synchronized with the network (i.e. reached the same block height) it’s now possible to safely interact with the client, including transacting and ingesting.  The benefit here is that we can reuse our second iteration pipeline by interacting with geth’s RPC interface (IPC is also available).

There are further enhancements that we can include in our pipeline and the good news is almost everything being built in this space is Open Source and available for public use.  A couple of beneficial helpers in the ingestion space are:

Delivering Insights

Building enterprise applications is a complicated process beginning with defining goals and requirements, developing integration with existing systems, integrating it into the existing process, and finally analyzing how the application is performing.  Introducing Blockchain into this process is no different. As this post should make clear there is Blockchain development work along with data engineering work required to reach the point where data analytics is achievable. Another key factor for available data is the distributed ledger of choice.  While we have focused on Ethereum many other ledgers exist and most large scale initiatives have yet to make their way to this network’s mainnet (many trials are in progress on the testnet).

With that knowledge we can find a simple corollary to our example of the supply chain for pork products.  As pigs are entered into the system they can be tracked via a unique ID which then gets attributes attached to it when the pig is measured and weighed.  A shipping truck scans the unique ID of the pigs it will be transporting to the factory, which will add to the attributes the unique ID will be tracking.  Once in the factory the pig is then a parent to multiple child products, all of which have a new child unique ID tied together via the parent ID. This requires us to analyze information based on a unique ID and tie information back to it as the item passes through the supply chain.

Fortunately we have ingested a real life dataset existing on the Ethereum blockchain that shows how this process works using a silly example.  Using the data we ingested lets find the top 5 products that have the highest amount of transactions since the start of August 2018.

 

SELECT value AS item_id,
       count(*) AS item_count
  FROM ethereum.token_transfers
 WHERE block_timestamp > "2018-01-01 00:00:00"
   AND token_address = "0x06012c8cf97bead5deae237070f9587f8e7a266d"
 GROUP BY value
 ORDER BY item_count DESC
 LIMIT 5;

 

This query returns the following results:

 

Item ID Item
Count
868345 32
901127 30
885882 29
885873 28
911063 24

 

If we tie this back to our pig example this would tell us that pig #868345 has had 32 transactions (interactions) in the past month.  It would be interesting to see what has happened with this pig and what makes it have more touch points than the others. We can now drill into ID #868345 by altering the query used above to:

 

SELECT t.block_number,
       x.transaction_index,
       t.transaction_hash,
       x.input
  FROM ethereum.token_transfers AS t
  JOIN ethereum.transactions AS x
 WHERE t.transaction_hash = x.hash
   AND t.block_timestamp > "2018-08-01 00:00:00"
   AND x.block_timestamp > "2018-08-01 00:00:00"
   AND t.token_address = "0x06012c8cf97bead5deae237070f9587f8e7a266d"
   AND t.value = "868345"
 LIMIT 3;

 

Block
Number
Transaction
Index
Transaction Hash Input
Data
6122589 68 0xdd9707c25904cd5765b24d35f4387e468119ac5dc8b…. 0xed6….
6067300 119 0x3d544af6accdc45689edf7f8f73da878c01f382f7fb6c…. 0x96b….
6066862 118 0x0ba83d11148b21f02361b4a7190db8179af082ec55f…. 0x96b….

 

This gives us the transactions associated to the ID and allows us an opportunity to drill into this data even further with the following query on transaction logs:

 

SELECT *
  FROM ethereum.logs
 WHERE block_timestamp >= "2018-08-01 00:00:00"
   AND transaction_hash = "0xdd9707c25904cd5765b24d35f4387e468119ac5dc8b6949b1ddd1e9146e1f388"

 

Block
Number
Transaction
Index
Transaction
Hash
Log
Index
Transaction Data
6122589 68 0xdd9707c…. 47 0x00000000000000000000000000000000000000000000000000000000000d3ff9
000000000000000000000000000000000000000000000000006a94d74f43
000000000000000000000000000006012c8cf97bead5deae237070f9587f8e7a266d
6122589 68 0xdd9707c…. 48 0x000000000000000000000000c7af99fe5513eb6710e6d5f44f9989da40f27f26
0000000000000000000000009f60aa00b6531291702d0e1892ab31d5c150661c
00000000000000000000000000000000000000000000000000000000000d3ff9
6122589 68 0xdd9707c…. 49 0x000000000000000000000000db416d496c855225f92cead56beac46fcdc60f28
00000000000000000000000000000000000000000000000000000000000d65e2
00000000000000000000000000000000000000000000000000000000000d3ff9
00000000000000000000000000000000000000000000000000000000005d6d4d

 

At this point we have the binary representation of data needed to interact with the code on the Ethereum network and can extract information for this touchpoint.  Included in this are the parties involved, the time of the interaction, the function used to interact with Blockchain, and attributes tied to each step of the process.  In our supply chain example this might be information about a successful delivery or purchase tying all the information back to the distributed ledger.

Admittedly this example is actually a bit of fun because while this transaction isn’t tied to a real pig the example is designed to tie back to a supply chain use case.  In fact this data is tied to a digital asset that exhibits similar properties to that of a supply chain to track attributes and ownership on the Ethereum Blockchain. Let’s take a peek at the digital item we have been digging into at the following address for item #868345:

Source: https://www.cryptokitties.co/

And the transaction details that we dug into are at the following address:

https://etherscan.io/tx/0xdd9707c25904cd5765b24d35f4387e468119ac5dc8b6949b1ddd1e9146e1f388#eventlog

Conclusion

Organizations that are able to gain trust and collaborate together will be able to grow their business and interact with their customers directly in ways not possible before.  By focusing on a simple example we see the potential efficiency gains that Blockchains can provide and integrate it with the rest of the enterprise. While Blockchains are still early in the technology life cycle the constant stress tests introduced by wider public adoption will only make the ecosystem more robust by improving on the building blocks already in motion.  If this excites you and is a solution you are looking for your business to solve please reach out!