From a nascent Apache project in 2006 to being commercially supported data platform by two public companies, Apache Hadoop has come a long way. Initially adopted by Web 2.0 companies, such as Yahoo, Facebook, LinkedIn & Twitter, Hadoop-based data platforms started becoming a major part in enterprise data infrastructure starting 2011. Today, a majority of Fortune-500 companies have adopted Hadoop data lakes. Maturity of Hadoop-native SQL Query engines (such as Hive & Impala), availability of Hadoop-based BI Platforms & Machine Learning platforms (Apache Spark) have made Hadoop data lakes more accessible to traditional enterprise data practitioners. In addition, availability of hosted Hadoop services on all major public cloud providers have allowed anyone with a credit card to start working with Hadoop, reducing the skills gap.

Having had the fortune of developing Hadoop from day one, while I was at Yahoo, I have witnessed various stages of Hadoop adoption in the enterprises. Most enterprises adopt Hadoop-based platform for its cheap scale-out storage (HDFS), as an “Active Archive” or a staging data store. The second phase of Hadoop adoption is to migrate workloads that can be easily parallelized, and moved away from expensive data warehouses, the most common example being expensive Extract-Transform-Load workloads. The third phase is to perform advanced analytics at scale. Based on  conversations with various customers and prospects over the last three years, we believe that most enterprises are somewhere between stage 2 & 3.

Previous State

Hadoop data lake (either on-premises or in clouds) is where the raw data from other parts of the enterprise first land. These raw datasets may consist of web-server & application server logs, transaction logs from various front-end databases, periodic snapshots from dimensional datasets, events & metrics from hardware or software sensors. IT operations (Data Ops) are responsible for ensuring that the raw data is loaded in a timely manner in these data lakes, and that the data cleansing workloads create “source of truth data sets”. This is the first stage of the ETL pipeline (sometimes called ELT, as raw data is loaded first, and then transformed).

Many transformations on these source-of-truth data sets involve complex joins across multiple datasets, imposing structure on semi-structured/unstructured data (flattening nested data), segmentation (across geographies, demographics, products, time intervals). After these transformations, multiple analyzable data sets are created. Next, pre-written business intelligence queries, and reporting queries are run on these newly created datasets, and the results are uploaded to different systems for serving them to business analysts.

For organizations that have begun to utilize advanced analytics (deep analytics, machine learning etc), these workloads consisting of long-running chains of Hadoop jobs are launched as “cron jobs”. Since most of the data lakes are operated & managed by IT operations, the analytics workloads are carefully vetted to make sure that they execute correctly, and that the data lake has enough capacity to execute those workloads in timely manner, and that these workloads do not bring down the entire cluster or other mission-critical workloads with misbehavior.

Challenges & Deficiencies

In addition to the “production data workloads” that run regularly (say, every hour or every day), there are many other workloads in any organization, which are ad hoc in nature. (Merriam Webster dictionary defines ad hoc as “for the particular end or case at hand without consideration of wider application“.) Many data explorations, visualizations, data science, and machine learned model building workloads are ad hoc, as these workloads do not have a set frequency, nor are they carefully tuned to extract performance.

As Hadoop data lakes become single source of truth for data, various teams in the organization need access the Hadoop data lake in order to perform these ad hoc workloads. These workloads, being experimental in nature, are typically time-bound. A team may run experiments on the subset of data from the data lake for a few weeks or months, and then either throw away their work (in case of unsuccessful experiment), or may productionize it (if successful, and needed long term). The main challenge for the Hadoop data lakes is co-existence of mission-critical production data pipelines, and ad hoc workloads on the same data lakes.

In the early Hadoop deployments at Yahoo, we had to maintain several separate Hadoop clusters, some reserved for production workloads, and some for ad hoc analytical use cases. However, keeping the data synchronized between the two remained a challenge (DistCP, a tool used to copy data between different Hadoop clusters, was the biggest consumer of resources among all users). In addition, because of the burstiness of the ad hoc workloads, either query latencies were unpredictable, or  cluster utilization was very low. This situation continues even today, among several enterprises.

Prior to advent of Hadoop data lakes, when a data warehouse was the single source of truth for enterprises, a similar challenge was solved by creating multiple “data marts“, typically, one per division within an organization, to allow ad hoc analytics on a subset of data away from data warehouse. In this usage pattern, Ampool performs the same role of  data mart, where Hadoop data lake is the single source of truth.

With Ampool

The above block diagram shows the deployment & data flow, when Ampool is used as a data mart for Hadoop data lakes.

  1. User specifies tables, partitions, ranges, and fields from data lake catalog
  2. Ampool executes queries on data lake to bulk load requested data
  3. (Or) Ampool bulk loads requested Hadoop files directly, enforcing schema on read
  4. Ampool presents loaded data as partitioned tables of structured/semi-structured data
  5. Analyst uses Hadoop-native query engines on Ampool to interactively query data
  6. (Or) Uses Ampool connectors for Python/R to access data
  7. Most tools & frameworks that integrate with Hadoop query engines can be used
  8. (Optionally) operational data is integrated in real-time (e.g. slow-changing dimension tables)
  9. Results published & shared through data lake

Why Ampool?

Several features in Ampool enable this usage pattern.

  • Native integration with Hadoop-native query engines to perform parallel loads from data lakes (Hive, Spark etc.)
  • Integration with Hadoop authentication & authorization (Kerberos, LDAP, Apache Sentry)
  • High speed data-local connectivity for Hadoop-native query engines (Hive, Spark, Presto, EsgynDB)
  • Both Row-oriented & Column-oriented memory & disk layouts to efficiently execute queries
  • Support for Hadoop-native file formats (ORC & Parquet)
  • Polyglot APIs (Java, REST, Python, R)
  • Native integration with Apache Kafka & Apache Apex to rapidly ingest operational data from other data sources
  • Linear scalability & automatic load balancing at high performance
  • Smart bidirectional tiering to local disks, extending memory beyond working set
  • Hadoop-native deployment, management & monitoring (e.g. Cloudera Manager, Apache Ambari)

In addition, Ampool can be deployed on demand using docker containers, and can be orchestrated with popular engines such as Kubernetes. (Deployment with Mesosphere DCOS is on the roadmap.) Since the data stored in Ampool is dynamically updateable, as the underlying data in the Hadoop data lake is changed, those updates can be reflected in-place on the Ampool data mart.

In near future, we are working on simplifying populating the Ampool data mart even more. Our goal is to make it as simple as working with a source code repository (e.g. git checkout, git pull, and git push).

When to consider Ampool as a high-speed data mart for your Hadoop data lake?

You should consider Ampool as an augmentation to your Hadoop data lake, if:

  • Your users are demanding access to the production Hadoop data lake for performing ad hoc analytics.
  • Your users are already familiar with any of Hadoop-native query engines or compute frameworks, and do not want to learn new query languages only for performing ad hoc analytics.
  • These ad hoc analytics workloads, if successful, may need to be deployed some day on production data lakes, using the same Hadoop-native compute frameworks.
  • Your business analysts are impatient, and won’s wait months for IT to carve out separate Hadoop clusters, and make data available to them.
  • Your users demand predictable latencies for their interactive workloads, and you do not want to spend excessive budget by over-provisioning your data lake.
  • You want your ad hoc analytics users to follow the same authentication & authorization & data governance policies as your production data lake.

If you are interested in exploring Ampool for this use case, write to us to schedule a demo.

Almost all applications are powered by one or more persistent data stores. Relational OLTP DBMSs have been most often used as data serving backends. However, in recent years, for large scale-out applications, non-relational (NoSQL) data stores, that sacrifice consistency for availability, are being used for data serving. Most modern mobile or web-based applications’ user experience on the front end is powered by making multiple API calls to the backend services which are in turn backed by multiple data serving platforms.

Previous State

Typical application architectures are composed of multiple tiers (and sometime with load balancers & proxies between these tiers), with applications making API calls to the application servers, and application servers translating these API calls to multiple requests to appropriate data serving systems in the backend, collecting responses from the data serving backends, combining them using business logic, and sending them back to the application, where the application renders it on the front-end. When there is temporal locality (data that is fetched recently, will be fetched again soon), the data serving backends are augmented with a lookaside in-memory caching layer, such as Redis of MemcacheD in order to reduce load on persistent data serving systems.

Challenges & Deficiencies

As number of applications, and number of concurrent users of each application increases, the data serving systems are overwhelmed by the number of concurrent requests, and they tend too spend a lot of time (query latency) waiting for individual requests to complete. As a single application request may result in multiple requests to underlying data stores, the application response is blocked for the last response from the data stores. These data fetch requests can be arbitrarily complex, and may result in sequential scan of all records in the data store, exhibiting a long tail behavior.

We identify the core problem in such systems as inability to efficiently construct a logical view over multiple data stores, serving these views concurrently to multiple applications, allowing these views to be updated, and propagating these view updates to underlying data stores.

Adding in-memory caching layer to each underlying data store take advantage of temporal locality, and speeds up data access from each individual data store, however, the application server still has to execute business logic and combine responses (or issue additional requests), before data is returned to the application. In addition, since the single-node lookaside cache is not fault-tolerant, when the cache goes down, applications will see severely degraded performance. A number of distributed & fault tolerant caching systems, such as Apache Geode, Apache Ignite, and Hazelcast aim to solve the cache availability issues. These distributed caching systems are essentially distributed in-memory hash tables with a primary key, and an arbitrary value, and provide fast key-based lookups on this hash table. However, they do not solve the fundamental issue of serving updateable, materialized view that may require fast short scan-oriented queries.

With Ampool

Above block diagram illustrates the data flow, when Ampool is used as Application Acceleration middleware.

  1. Ampool pre-loads initial datasets
  2. Construct in-memory materialized views, by embedding business logic as stored procedures
  3. Application requests data from View access layer
  4. View access layer parses request, adds metadata, and forwards to Ampool
  5. Ampool checks authorization at view-level, and serves projection/subset of view
  6. View access layer performs additional transformations
  7. Response is sent back to Application
  8. In case of updates, Ampool asynchronously updates the underlying databases

Why Ampool?

Ampool has many features that enable it to be used as multi-tenant application acceleration platform.

  • Transparent, and customizable bulk loader functionality, can use JDBC, NoSQL (e.g. MongoDB), or File System APIs
  • Low-Latency (~20µs) lookups & in-place updates allow fast construction of materialized views, and synchronous updates
  • Pre-processors (similar to database triggers) allow customization of fine-grained authorizations
  • Row versioning enables fast lock-free inserts & updates
  • High throughput parallel query to rapidly serve materialized views with projection & filter pushdowns
  • Change Data Capture (CDC) mechanism to asynchronously update underlying tables
  • High query concurrency (~100s of concurrent clients) at low (sub-millisecond) latency
  • JDBC Connectivity with Apache Calcite integration

Please note that Ampool does not enforce distributed transactional updates across multiple non-collocated tables by default. However by integrating Ampool clients with external transaction managers (such as Apache Omid or Apache Tephra) one can implement transactional behavior (only if needed.)

When to consider Ampool for Application Acceleration?

You should consider Ampool as middleware for application acceleration if:

  • Your applications need to serve data concurrently from multiple RDBMS or NoSQL data stores (including file systems)
  • Your application servers are getting overwhelmed by business logic that results in sequential scans on underlying data stores that exhibit long-tail behavior of such queries
  • Your business logic is written in Java (current limitation of Ampool, will go away in future)
  • Your data model consists of both structured & semi-structured data
  • Working set of your multi-tenant applications does not fit within a single machine’s memory

If you are interested in exploring Ampool, write to us to schedule a demo.

Wishing our blog readers a very happy new year 2018.

In the first blog series of this new year, we will outline three broad patterns where Ampool Active Data Store (ADS) and In-Memory Platform is being used for real use-cases by our customers and pilots. In this post, we will describe the first usage pattern for Ampool, Near-App Analytics.

Previous State

Modern applications need to provide hyper-personalized experiences to their end users, so as to improve user-engagement by providing most-relevant information. In order to achieve this, the application needs to accurately model user behavior while interacting with the application, and tailor the presented information to the user activity, as opposed to a generic one-size-fits-all model. Previously, in data lake architectures, the user activity is logged by the applications, and stored in log files on application servers. Periodically, this collection of user activity logs are transported and ingested into a centralized data lake as a set of raw data files. Later, in batches, these raw data files are cleansed & denormalized by combining them with other reference datasets, and user activity sessions are created. Activity modelling algorithms using machine learning techniques are applied on a sliding window of time (for example, last 30 days) on these datasets, and the model is evaluated on previous known and labelled behavior of users to determine efficacy of the new model created. If the model is found to be effective & beneficial, then the model is uploaded into application servers, and is applied when the user next visits the application.

Deficiencies

As the user activity data is staged on “the edge”, i.e. the application servers, and goes through multiple transports, format conversion, batch ingestion etc to finally land in centralized data lake, before it is available for analytics, the business value of the data, and the actionability of insights that could be gained from analyzing this data rapidly diminishes with time. In addition, because of the delay between user activity, and the insights generated, the machine-learned user behavior models are often stale. Paul Maritz (Executive Chairman of Pivotal), succinctly described the core application of “Big Data” as ability of “Companies & Organizations to catch people or things in the act of doing something and affect the outcome.” Clearly, feeding stale insights back to applications implies losing the ability to catch users in the act of interacting with the application, and to affect the outcome. The core problem that needs to be solved, is to create “Real-Time, Personalized, Actionable Information, in Current Context”. This is where Ampool In-Memory Platform comes into picture.

“Companies need to learn how to catch people or things in the act of doing something and affect the outcome

-Paul Maritz, Executive Chairman, Pivotal

With Ampool

The block diagram above illustrates where Ampool platform is used for Near-App Analytics, with the data flow.

  1. Application emits data exhaust (user activity events) on a message queue (e.g. Apache Kafka)
  2. Ampool connector for message queue fetches, sessionizes, & denormalizes events in Ampool
  3. Real-time Analytics (model refinement, dashboards, anomaly detection) performed in Ampool
  4. Results of analytics (models, visualization, alerts) emitted on message queue
  5. Serving store is updated with results of analytics
  6. Application uses results of analytics
  7. Colder data persisted in data lake for historical analytics (e.g. large scale batch model training)

Why Ampool

The following features of Ampool enable Near-App Analytics seamlessly:

  • Native integration with message queues (e.g. Apache Kafka). An event stream published to a topic in the message queue immediately materializes as either a flowing table, or a materialized view in Ampool
  • Low-Latency (~20µs) lookups & in-place updates eliminate batch ETL, especially when reference (or dimensions) tables are already populated in Ampool
  • High throughput (~6.5 Billion 1Kilobyte events/day/node) ingestion & linear scalability of a distributed platform for most demanding applications
  • Programmable co-processors  (triggers) for data cleansing, transformations, & denormalization are executed pre- and post- operations on data
  • Sub-second analytics, with projection pushdown, filter pushdown, & data-local computations
  • Efficient integration with analytical frameworks (e.g. Apache Spark, Apache Hive, Presto), eliminates need to learn new analytical computation frameworks
  • Change Data Capture (CDC) stream listener to push analytics results on message queue (or serving store)
  • Seamless & configurable tiering to colder storage in Hadoop-native file formats (Apache Parquet & Apache ORC) for historical analytics

In addition to the above mentioned features, Ampool can be deployed in familiar application deployment frameworks, such as Docker containers, and can be orchestrated using modern platforms such as Kubernetes, the reducing the need to learn new/unproven platforms.

If you are using Pivotal Gemfire (or Apache Geode) for caching needs of your applications, you would find Ampool ADS (Powered by Apache Geode) a natural fit for your near-app analytics needs.

When to consider Ampool for Near-App Analytics?

You should consider deploying Ampool for your near-app analytics needs, if:

  • Your scale-out applications generates huge volumes of structured or semi-structured data rapidly (several billions of events per day)
  • The delay between user activity and resulting insights decreases the business value of the data generated
  • Your applications’ serving data stores (OLTP RDBMS, or NoSQL K-V Store) is not able to perform rapid analytics
  • You cannot “close the loop” for analytics, because of various reasons, including multiple slow staging environments, batch ETL, batch analytics etc.

ICYMI (from December 2017, which was “so last year”):

If you are interested in exploring Ampool, write to us to schedule a demo.

We are excited to announce the Ampool 2.0 release which is an important milestone for Ampool. This release includes many new features and important performance  improvements.

Following major features are added:

FTable Storage Format

The FTable employs block strategy for its in-memory layer that groups multiple records together and minimizes per record storage overhead and the overall memory footprint. Thus, for append-only fact tables, logs or metrics, it helps you to make optimal usage of available memory. With 2.0 following three different formats are supported.

  • AMP_BYTES : The rows are serialized using Ampool specific encoding and are stored individually.  This may consume more memory but there is no additional overhead during scan to retrieve a row.
  • AMP_SNAPPY: The rows serialized using Ampool specific encoding and compressed using Snappy compression upon reaching the specified block size. This will help reducing memory usage but all rows will have to be decompressed during scan.
  • ORC_BYTES: All rows from a block will be converted to ORC columnar format upon reaching the specified block size.  Then each block will contain binary ORC data, representing the rows, which will be interpreted during scan.

FTable Delta update:

FTable stores multiple records together and need to propagate the update to the replicated copies and persistence layer. Following two types of delta propagation are added in this release.

  • Delta propagation to replicated copies: In case of updates to the same block, entire block is replicated and this incurs a major network overhead. With this release both single append and batch append operations that are updating the block, only the appended records are propagated to the replicated copies. This reduces the network overhead and improves the performance for the append operations to the FTable.
  • Delta propagation to persistence layer: As FTable stores rows in each bucket inside blocks of rows, all the ingestion operations (append/batch append) update the same block until the block size is reached. In case of updates to the same block complete block value used to be written to the disk rather than just an update to the block. This causes the lot of disk writes and compaction. With this release, only the updated records to the block are propagated  to the persistence layer and this reduces the disk writes and compaction.

Security Enhancements:

Ampool supports authorization to control access to data by authenticated user. The admin can control access to a table data depending on the identity of the user attempting the operation. Following two types of authorization are supported

  • Sentry Authorization :  Apache Sentry is a centralized store of authorization data and enforce fine grain role based authorization to data stored in data storage system such as Ampool ADS.
  • LDAP Authorization : Ampool ADS  leverages LDAP server for user authentication and authorization. In this use case, the LDAP server creates and manages users, and no information about users is stored on the ampool. Group/role information is managed both on the LDAP server and in Ampool.

Column Statistics :

For FTable, column statistics per block are generated and stored with the block. This helps in skipping the unwanted blocks during scan using filters. The stored statistics are min and max per column. These are updated with each append/batch-append. The statistics are stored for these data types: INT, LONG, BYTE, SHORT, FLOAT, DOUBLE, DATE, TIMESTAMP, STRING. The column statistics can help eliminate having to scan or decompress the block completely when no matching row could be found in the block.

Also, following known issues from previous releases are fixed in this release:

  • Provide functionality of deleting all the versions of all the keys qualified by given filter list without having to provide the key list.
  • MASH: Add a command to show table distribution on data and buckets on both primary and secondary copies.
  • Support for lowercase types names in table schema.
  • Server scan performance improvement.

Performance Improvements with delta replication

Configuration:

Single Append operation on FTable
column-length=100
num-columns=10
redundancy=3 
Number of buckets : 113
FTable Block Size : 1000

Number of Rows

Append Time in Seconds

(with Delta Replication)

Append Time in Seconds

(without Delta Replication)

Speedup
20000 16 320 ~20 Times
200000 164 3202 ~20 Times
2000000 1688 32030 ~19 Times

Performance Improvements with delta persistence

Configuration:

Servers Nodes: 8
Heap size per server:  50GB
Number of buckets : 113
FTable Block Size : 1000
Client Batch Size: 1000
Redundancy=3

Number of Rows (Size) With Delta Persistence Without Delta Persistence Difference
  Ingestion Time(sec)
five parallel clients
Total 
size on Disk (GB)

Total Heap
Size (GB)

Total Writes
on disk

(iostat)(GB)

Number of
oplog  files
created
(total files created)

Ingestion Time(sec)
five parallel
clients

Total Disk
size (GB)
Total Heap
Size (GB)

Total  Writes
on disk

(iostat)(GB)

Number of oplog  files
created
(total files created)
%Reduction wrt time %Reduction wrt size on disk %Reduction wrt to disk writes %Reduction wrt number of  oplogs files
50Million (40GB) 949 72 69.6 59.67 72 1168 104 70.4 90.76 1211 18.75 30.76 34.25 94.05
50Million (40GB) 945.8 72 70.4 59.66 72 1127 104 70.4 92.93 1225 16.07 30.76 35.79 94.12
50Million (40GB) 941.2 72 69.6 59.66 72 1146.2 104 70.4 91.66 1202 17.88 30.76 34.91 94.00

 

Release notes are updated at http://docs.ampool-inc.com/core/RN2.0.0/

Click here to download the Ampool release 2.0.

Our Open Source project Monarch will be updated soon with Ampool 2.0 release changes, and this release will also be available on AWS Marketplace (both single node, and cluster mode) in early 2018.

Public cloud, once primarily used by agile startups, has taken the enterprise IT infrastructure by storm. Instant fulfillment, a wealth of services, pay-as-you-go model (in addition to reserved resources), multiple deployment options (bare metal, virtual machines, containers, and functions) have attracted application developers in startups and large companies alike to the public clouds.

At Ampool, we have been using Amazon’s public cloud, AWS, for development, testing, benchmarking, proof of concepts, & sharing artifacts from day one, thanks to the generous AWS Activate grant. While we have designed Ampool Active Data Store (ADS) as a cloud-agnostic in-memory computing platform, and have tested it on other public clouds, such as Google Cloud Platform (GCP), and IBM’s Softlayer, customer inquiries about Ampool’s availability on AWS topped among all the other public clouds.

As a result, we are announcing today that Ampool ADS is now available on AWS Marketplace.

We are keeping our commitment that the single node version for development and testing will be free forever, and have listed a Free Single Instance AMI (EC2 Charges may apply) at https://aws.amazon.com/marketplace/pp/B077D81DD1. The single node version should not be used for production, since it lacks several capabilities for high availability & fault tolerance.

For production deployments, we have listed Ampool Cluster Version at https://aws.amazon.com/marketplace/pp/B0784YHDW8, with a 31-day free trial. Ampool Cluster edition is based on CloudFormation Template, which allows a single click deployment of Ampool ADS on EC2 instances.

Currently, version 1.5 of Ampool ADS is listed on AWS. We will be upgrading it to version 2.0, when we finish stress testing it.

Documentation of Ampool ADS for both on-premises and AWS deployments is available at http://docs.ampool-inc.com/.

Try it out, and send us feedback.

From relatively obscurity two decades ago, Open Source Software has come a long way, and has become a dominating force in enterprises. Most modern data platforms, both operational and analytical, are built with OSS projects, such as Hadoop, Cassandra, MongoDB, Spark, and Kafka. In our experience, many traditional enterprises in financial services, telecom, manufacturing, and many other verticals have taken an “Open Source First” approach. In addition, enterprise workloads moving from on-premises to public or private clouds are evenly divided between proprietary services provided by cloud vendors and open source software, either hosted by commercial vendors, or self-deployed and managed. In Mary Meeker’s 2017 “State of the Internet Report”, cloud-proprietary services’ lock-in is cited as a concern by 22% of enterprises, and is rapidly growing. Therefore, hosted or self-managed services powered by open source software has become the choice of enterprises on public clouds.

Ampool’s Open Source Lineage

Ampool’s Active Data Store (ADS) is powered by Apache Geode (previously Pivotal’s proprietary Gemfire In-Memory Data Grid.) As Chief Scientist at Pivotal, I was deeply involved in defining Pivotal’s OSS strategy for its Data products, and facilitated open sourcing of Pivotal Gemfire as an Apache project, along with Ampool’s Technical Advisory Board member, Roman Shaposhnik.

Currently, Ampool’s Hitesh Khamesra, & Avinash Dongre are on the Project Management Committee (PMC) for Apache Geode, and Ampool has employed five committers for Apache Geode. In addition, Suhas Gogate, our Chief Solutions Architect, and I have been long-term contributors to Apache Hadoop ecosystem projects.

Ampool uses Apache Geode as a foundation, and all our additions are built on top of Geode. Thus, the entire functionality of Geode In-Memory Data Grid, which is an in-memory caching layer and an object store for applications is included and enabled in Ampool. As of now, Ampool ADS is strictly a superset of Apache Geode, with minimal changes to Apache Geode to enable various additions to make it into a performant & robust analytical memory-centric store.

In addition,  many data access and persistence connectors to and from Ampool are built with OSS query engines, such as Apache Spark, Apache Hive, Apache Kafka, Apache Apex, Apache Trafodion, Cask Data Analytics Platform, and Apache Hadoop Distributed File System with OSS file formats such as Apache ORC, and Apache Parquet.

Ampool believes in the superiority of OSS as a distribution model, which reduces friction in adoption for developers of data platforms. However, we are also focused on building a viable business, which will allow us to rapidly innovate, and meet the cutting-edge requirements of our customers, and provide them with ease of use, secure deployment, painless management. One of the primary reasons Ampool ADS was developed for the last two years as proprietary, closed-source addition to Apache Geode, is because the speed of development in an open source community is significantly reduced due to the consensus-driven approach, and would not have been satisfactory for the rapid development pace needed for a startup catering to cutting-edge needs of our customers and prospects.

Having demonstrated that Ampool can meet those needs for our customers, I am pleased to announce today, that Ampool Active Data Store, along with data access & ingestion connectors to several OSS query engines, and data ingestion frameworks, is available as project “Monarch” on Ampool’s open GitHub repository at https://github.com/ampool/monarch under Apache License (ASLv2).

Project “Monarch” currently contains code for both the Active Data Store, and Connectors for Hive, Kafka, Presto, & Spark. More connectors, such as Apache Calcite, HDFS (ORC & Parquet) will be released into OSS soon.

Currently, the OSS Monarch is based on Ampool v 1.5. We are working hard on releasing Ampool 2.0 version soon, which will be merged upstream into Monarch immediately after the 2.0 release.

If you only want to try out Ampool as a binary distribution for single node, you can download it, as before, from http://www.ampool.io/product.

Documentation for usage and deployment can be found at http://docs.ampool-inc.com/

If you need support using Ampool (powered by Monarch) email us at  support [at] ampool [dot] io.

In addition, we have a discussion group at https://groups.google.com/forum/#!forum/ampool-users.

Looking forward to feedback & contributions.

Apache Spark is a distributed computing framework with implicit parallelism and fault-tolerance. The Ampool ADS is distributed in-memory store with built-in fault-tolerance. Typically Apache Spark caches data into its executor memory for faster processing and then uses the underlying disk based stores as and when needed. The data fetched is immutable and typically it is persisted to the disk based stores and re-read for processing at later point in time or by other Spark jobs. Thus, Ampool ADS complements the Apache Spark by supporting the in-memory tables that Spark can directly access and process data from it. It eliminates the need to cache large immutable data, which is local, in Spark jobs but instead the data cached in Ampool ADS, being global, can be accessed by any Spark job at any point in time.

Ampool provides a seamless integration between its memory tables and Spark DataFrames. A Spark DataFrame is a distributed collection of data organized into named columns similar to the Ampool memory tables. Also, Spark provides SQL like query functionality as DataFrame API (like filters and aggregations) that is transparently supported, by Ampool, on the data data stored in Ampool ADS. Apart from the loading the data-frames from Ampool ADS, no change is required on the client side.

The below diagram shows the deployment of Ampool ADS cluster along with Spark cluster. Though the Spark cluster can access the remote Ampool ADS, it is recommended to co-locate the applications (Spark workers) and the respective Ampool ADS nodes. Also, once Ampool ADS is configured as a data store, it takes care of optimal access and efficiency and application need not worry about it.

Spark cluster is typically deployed either in standalone mode or in the existing cluster managed by YARN or Mesos. Spark cluster using Ampool ADS as an in-memory data store can take advantage of data locality if both are configured to co-locate on the same nodes.

The Ampool-Spark connector jars along with the dependent libraries are required to be provided when launching the Spark jobs; either via Spark shell or by submitting the Spark applications to the cluster. When running Spark applications the only mandatory configuration is to provide the Ampool cluster locator details — the locator host-name and locator port. The Spark takes care of distributing the connector jars to all executors. It can be avoided if the required connector jars are installed on the Spark worker nodes.

The Ampool-Spark connector enables you to:

  • Save an existing Spark data-frame as an Ampool table (by using the DataFrame column mapping)
  • Load an existing Ampool table as a Spark data-frame (by using the Ampool Table column mapping)
  • Load an existing Ampool table as DataFrame with following operations:
    • Filters
    • Aggregations
    • Column Selections
  • Load an existing Ampool table as data-frame, register it as a temporary table and then use Spark-SQL to query data from Ampool

The Ampool ADS, via connector, supports following data types to be used as column types. These types are mapped to the respective Spark data-frame types.

  • Binary
  • Boolean
  • Byte
  • Date
  • Double
  • Float
  • Integer
  • Long
  • Short
  • String
  • Timestamp

Ampool also supports following complex types containing one or more of the above basic types:

  • Array
  • Struct
  • Map

Along with the above types Spark-ML Vector, the user defined type, is also supported as native type by the connector. Both the Sparse and Dense vectors can be read and saved to Ampool like any other basic type.

Once all the required libraries are available and cluster is running, the Spark jobs can be launched (via spark-shell or spark-submit) as below:

$ <spark-home>/bin/spark-shell --jars <ampool-spark-jar>,<ampool-client-dependencies-jar>

The Ampool cluster (i.e. locator) details can be provided via following properties:

  • ampool.locator.host (default localhost)
  • ampool.locator.port (default 10334)

This configuration or any other configuration attributes can be provided as a map of key-values as below:

scala> val options = Map(("ampool.locator.host", "localhost"), ("ampool.locator.port", "10334"))

Following are some examples demonstrating how to save an existing data-frame to Ampool or load an existing Ampool table as a data-frame in Spark.

To save an existing DataFrame as Ampool Table you can execute following command (from spark-shell).

scala> val myDataFrame: DataFrame = ...
scala> val options = Map(("ampool.locator.host","localhost"), ("ampool.locator.port","10334"))
scala> myDataFrame.write.format("io.ampool").options(options).save("ampool_table")

The above command will create an ordered Ampool MTable called ampool_table in an Ampool ADS (using localhost and 10334 as locator host and port respectively). The schema of the created table will be equivalent to the schema of myDataFrame.

If you have JSON data as a source, it can be converted to a data-frame and saved to Ampool ADS as a table.

scala> val options = Map((“ampool.locator.host”,”localhost”), (“ampool.locator.port”,”10334″)) scala> val jsonDataFrame = sqlContext.read.json(path) scala> jsonDataFrame.write.format(“io.ampool”).options(options).save(“json_table”)

If you already have an Ampool table, it can be loaded as a data-frame in Spark (from spark-shell):

scala> val options = Map(("ampool.locator.host","localhost"), ("ampool.locator.port","10334"))
scala> val ampoolDataFrame = sqlContext.read.format("io.ampool").options(options).load("ampool_table")
scala> ampoolDataFrame.show()
scala> ampoolDataFrame.filter("size > 4096").show()

The above command will create a spark data-frame from existing ampool_table. Once the table is loaded as data-frame, we can execute any supported data-frame operation on it, which will eventually translate/retrieve data from Ampool ADS as required.

Once the Ampool table loaded as a data-frame, you can register it as temporary table and then use the Spark SQL to query data from Ampool ADS (from spark-shell):

scala> val options = Map(("ampool.locator.host","localhost"), ("ampool.locator.port","10334"))
scala> sqlContext.read.format("io.ampool").options(options).load("ampool_table").registerTempTable("my_table")
scala> val results = sqlContext.sql("select * from my_table where size = 4096")
scala> results.show()
scala> println(results.count())
scala> results.foreach(println)

 

You can also use the custom Spark UDFs with Ampool connector. You can register the custom UDF, say SampleEqualsUDF, with the SQL Context with appropriate return type. Once registered, it can be used similar to any other built-in UDFs in SQL queries as below:

scala> val df = sqlContext.read.format("io.ampool").load("ampool_table")
scala> df.registerTempTable("my_table")
scala> sqlContext.udf.register("MyEquals", SampleEqualsUDF.SAMPLE_EQUALS_UDF, DataTypes.BooleanType)
scala> sqlContext.sql("select * from my_table where MyEquals(column_1, 'constant_value')").show()

Continuing from our previous post on mutable tables, Introducing MTable, we now see how we can interact with this data abstraction through REST API’s.

The Ampool developer REST interface runs as an embedded HTTP or HTTPS service (Jetty server) within an Ampool ADS server. For the purpose of documentation, the developer REST API’s are also integrated with the Swagger™ framework. This framework provides a browser-based test client that allows you to visualize and try out the REST APIs before you use in your application. Swagger application JARs are included in the Ampool ADS distribution so you do not need to install any additional libraries to use it.

The Ampool MTable REST APIs allow you to to do multiple operations on MTable like create, delete, insert, scan, list and count operations. Following is the complete list of operations using these APIs:

  1. List (GET) all MTables
  2. Insert (POST) single record
  3. Insert (POST) multiple records
  4. GET the total number of entries in the MTable
  5. GET value for a single key
  6. GET values for multiple keys
  7. DELETE a single record
  8. DELETE multiple records
  9. DELETE the entire MTable

For more details about these REST end-points, please check the online docs to understand how to enable and work with it. Once you start the Ampool ADS server with REST enabled, the Swagger UI will be rendered on port 8080 (by default) on the host machine.

The Swagger UI can be used to perform MTable operations using REST endpoints. The picture below displays MTable PUT operation from the Swagger UI. A sample record is inserted into an MTable against key 9.

The REST APIs can also be accessed via the command-line using a client, such as curl. The example below shows how it uses the MTable get REST endpoint to fetch the record inserted from the Swagger UI:

So, try interacting with MTables the REST way and post any questions or suggestions that you may have.

Continuing from the previous post , In this post we will demonstrate some of the APIs to create and access data in MTable. The example below uses Java APIs to create and populate the data and Spark DataFrame/SQL API to query data from the MTable. Note: Spark SQL also support Hive Query syntax and UDFs.

Use case

Let us use a sample data set providing information on various Hotels including their pricing, amenities and customer reviews etc. We would use it to build a portal to find hotels in a certain price range or to get the review for a particular hotel or get list of hotels with price range and specific rating and so on. Sample Data can be downloaded from TripAdvisor Data Set. Complete Example code is available in Ampool’s public git repository.

Lets define 2 tables HotelInfoTable and RatingReviewTable w/ schema given below,

HotelInfoTable

Field Type
name String
hotelURL String
priceRangeStart Double
priceRangeStop Double
address String
HotelID Integer
imgURL String

RatingReviewTable

Field Type
contents String
date Date
author String
service Integer
business_service Integer
cleanliness Integer
check_in_front_desk Integer
overall Double
value Integer
rooms Integer
location Integer
HotelID Integer

 

Let’s get to the code

Step 1 : Setup Ampool Cluster

At minimum, user is expected have single node Ampool Cluster and Spark Cluster installed. Spark Scala shell will be used to run the spark queries on data frames pointing to underlying Ampool tables.

Ampool Data Store can be downloaded from

Use Java JDK 1.8

Step 2 : Java Client: Create Client Cache
Following code for creating a client cache actually gets a handle to Ampool’s server side memory pool. Although there is a way, it does not implicitly cache any server side table data on the client ( and hence client cache is bit of a misnomer in this case). To get the handle to Ampool’s server side memory pool, create MConfiguration object providing Ampool’s locator host and port number.

MConfiguration mconf = MConfiguration.create();
mconf.set(Constants.MonarchLocator.MONARCH_LOCATOR_ADDRESS, 127.0.0.1);
mconf.setInt(Constants.MonarchLocator.MONARCH_LOCATOR_PORT, 10334);
MClientCache clientCache = new MClientCacheFactory().create(mconf);

Step 3 : Java Client Define a MTableDecriptor.
In Ampool Active Data Store, MTableDescriptor class defines MTable schema and sets additional properties such as, Table Type, Persistence policy, Max versions for each row, Number of table splits (data buckets), Split range etc. See Online Documentation for more details.

In this case we use MTable of type UNORDERED for both the tables.

MTableDescriptor tableDescriptor = new MTableDescriptor(MTableType.UNORDERED);
tableDescriptor.enableDiskPersistence(MDiskWritePolicy.ASYNCHRONOUS);

tableDescriptor.addColumn("name", MBasicObjectType.STRING);
tableDescriptor.addColumn("hotelURL", MBasicObjectType.STRING);
tableDescriptor.addColumn("priceRangeStart", MBasicObjectType.DOUBLE);
tableDescriptor.addColumn("priceRangeStop", MBasicObjectType.DOUBLE);
tableDescriptor.addColumn("address", MBasicObjectType.STRING);
tableDescriptor.addColumn("HotelID", MBasicObjectType.INT);
tableDescriptor.addColumn("imgURL", MBasicObjectType.STRING);
MTableDescriptor tableDescriptor = new MTableDescriptor(MTableType.UNORDERED);
tableDescriptor.enableDiskPersistence(MDiskWritePolicy.ASYNCHRONOUS);

tableDescriptor.addColumn("contents", MBasicObjectType.STRING);
tableDescriptor.addColumn("date", MBasicObjectType.DATE);
tableDescriptor.addColumn("reviewId", MBasicObjectType.STRING);
tableDescriptor.addColumn("author", MBasicObjectType.STRING);
tableDescriptor.addColumn("service", MBasicObjectType.INT);
tableDescriptor.addColumn("business_service", MBasicObjectType.INT);
tableDescriptor.addColumn("cleanliness", MBasicObjectType.INT);
tableDescriptor.addColumn("check_in_front_desk", MBasicObjectType.INT);
tableDescriptor.addColumn("overall", MBasicObjectType.DOUBLE);
tableDescriptor.addColumn("value", MBasicObjectType.INT);
tableDescriptor.addColumn("rooms", MBasicObjectType.INT);
tableDescriptor.addColumn("location", MBasicObjectType.INT);
tableDescriptor.addColumn("HotelID", MBasicObjectType.INT);

Step 4 : Java Client: Creating Tables
In Ampool Data Store , Creating a table is admin operation, Admin handle can be obtained using clientCache.getAdmin API.

MTable ratingReviewTable = clientCache.getAdmin().createTable("RatingReviewTable", tableDescriptor);
MTable hotelInfoTable = clientCache.getAdmin().createTable("HotelInfoTable", tableDescriptor);

Step 5 : Java Client: Populating Tables
MPut class is used to define the Row Key and Column values for each record to be inserted into MTable. MPut objects can also be batched as List and the whole batch of records can be inserted in a single Put operation.

Following code show populating single record in both “hotelInfoTable” and RatingReviewTable“. User can use loop to insert multiple records.

// Create MPut object w/ row key
MPut putRecord = new MPut(Bytes.toBytes(hotelInfoObj.getHotelID()));

// Start adding columns with Column Name and Column Values.
putRecord.addColumn("name", hotelInfoObj.getName());
putRecord.addColumn("hotelURL", hotelInfoObj.getHotelURL());
putRecord.addColumn("priceRangeStart", hotelInfoObj.getPriceRangeStart());
putRecord.addColumn("priceRangeStop", hotelInfoObj.getPriceRangeStop());
putRecord.addColumn("address", hotelInfoObj.getAddress());
putRecord.addColumn("HotelID", hotelInfoObj.getHotelID());
putRecord.addColumn("imgURL", hotelInfoObj.getImgURL());

// perform put operation
hotelInfoTable.put(putRecord);
// Create MPut object w/ row key
MPut putRecord = new MPut(reviewInfo.getReviewId());

// Start adding columns with Column Name and Column Values.
putRecord.addColumn("contents", reviewInfo.getContent());

SimpleDateFormat sdf = new SimpleDateFormat("MMMM d, yyyy");
final Date date = sdf.parse(reviewInfo.getDate());
putRecord.addColumn("date", new java.sql.Date(date.getTime()));

putRecord.addColumn("reviewId", reviewInfo.getReviewId());
putRecord.addColumn("author", reviewInfo.getAuthor());

RatingsInfo ratingsInfo = reviewInfo.getRatingsInfo();
putRecord.addColumn("service", ratingsInfo.getService());
putRecord.addColumn("business_service", ratingsInfo.getBusiness_service());
putRecord.addColumn("cleanliness", ratingsInfo.getCleanliness());
putRecord.addColumn("check_in_front_desk", ratingsInfo.getCheck_in_front_desk());
putRecord.addColumn("overall", ratingsInfo.getOverall());
putRecord.addColumn("value", ratingsInfo.getValue());
putRecord.addColumn("rooms", ratingsInfo.getRooms());
putRecord.addColumn("location", ratingsInfo.getLocation());

putRecord.addColumn("HotelID", hotelInfoObj.getHotelID());

ratingReviewTable.put(putRecord);

Step 6 : Querying MTable using Spark
Lets see how one can point Spark data frames to Ampool MTable and query query the data using Spark SQL.

Ampool versions 1.1 & 1.2 support Spark 1.6.1

final String locatorHost = "localhost";
final int locatorPort = 10334;

SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("SparkQueryRunner");
JavaSparkContext jsc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(jsc);
Map<String, String> options = new HashMap<>(3);
options.put("ampool.locator.host", locatorHost);
options.put("ampool.locator.port", String.valueOf(locatorPort));

DataFrame hotelInfoDF = sqlContext.read().format("io.ampool").options(options).load(HOTEL_INFO_TABLE);
DataFrame ratingReviewDF = sqlContext.read().format("io.ampool").options(options).load(RATING_REVIEW_TABLE);

hotelInfoDF.show();
ratingReviewDF.show();

hotelInfoDF.registerTempTable("HI");
ratingReviewDF.registerTempTable("RR");

sqlContext.sql("select name from HI").show();
sqlContext.sql("select name, priceRangeStart from HI where priceRangeStart > 100").show();

final DataFrame dataFrame = sqlContext.sql("select first(HI.HotelID) AS HOTELID, first(HI.name) AS NAME, AVG(RR.overall) as RATING, first(HI.priceRangeStart) AS STARTPRICE, first(HI.priceRangeStop) AS STOPPRICE from HI, " +
"RR where HI.HotelID = RR.HotelID and HI.name != 'null' and HI.priceRangeStart > 150.0 and HI.priceRangeStop < 200.0 and HI.priceRangeStart < HI.priceRangeStop " +
"group by HI.HotelID order by RATING DESC");

dataFrame.filter("RATING > 3.0 and RATING < 5.0").show();

Complete Example code is located here. You will need to update variable ‘DATA_DIRECTORY‘ in io.ampool.MainRunner.java to the location where you have downloaded the TripAdvisor data.

So give it a try!

 

The previous post discussed about the mutable data in Ampool Active Data Store (ADS). Here we will discuss about how Ampool ADS enables you to deal with very large volume of immutable data. The FTable stands for “flow-table” and it enables fast ingestion of very large amount of immutable data (aka facts data). The data is internally stored in multiple storage tiers and moved across seamlessly whenever required. The storage tiers are typically arranged in the ascending order of latency and decreasing order of the cost (per GB) associated with it. The newer data is relatively accessed much more frequently as compared to the older data. Thus, the high-demand data is stored in a tier with lowest latency whereas the data with lower demand is stored in the tiers with higher latency but lower cost.

The Ampool FTable nicely complements MTable in data warehousing use cases where mutable dimension data stored in MTable vs large ever growing facts data is stored in FTable with tiered storage. Separating these two types of tables and keeping the FTable primarily immutable we avoid the need for background data compaction and thus use SSD/Flash storage more effectively by eliminating write amplification phenomenon.

Similar to MTable, FTable also provides partitioningredundancypersistence and support for table schema with basic and complex data types. The data is partitioned across the cluster nodes using hash based partitioning using specified column values as partition key. The table could also be configured with redundancy for availability & fault tolerance. It also provides you with an option to persist the in-memory data to a persistent local disk storage so that it can be recovered in case of node failure/restarts.

Being immutable table, at high level it only supports the append and scan operations (Note: Although, to curate occasional data inconsistencies it does provide administrative operations like bulk delete/update with arbitrary filter criteria). The append enables you to ingest the data at very high ingestion rates. Either single record or bulk insertion of records is possible. A typical key-value store has a constant overhead per entry. The FTable employs block strategy for it’s in-memory layer that groups multiple records together and minimizes per record overhead and the overall memory footprint. Thus, for a typical append-only kind of tables it helps you to make optimal usage of available memory. .

FTable support INSERT_TIME as an in-built column for users to efficiently retrieve the data over a specified time range without having to scan the entire table. When the data is ingested, Ampool records the insertion-time along with each record. This insertion-time is used internally as an implicit order to store the data within a partition. This helps to boost the real-time range queries based on the insertion-time e.g. get all data for past N hours.

With FTables you could configure a hierarchy of tiers. In a recommended configuration first tier is always a memory-tier, second tier is shared nothing local disk/SSD based, while third tier is an archive tier using shared stores such as HDFS/S3 etc. FTable can also be configured to move the data from one tier to next tier based on specific policies. Both time and space-usage based policies could be configured per tier, per table. As data exceeds the time or space threshold, it is automatically moved to next tier. This allows you to make more optimal usage of the available resources in the respective tiers. Ampool FTable uses open data storage formats e.g. ORC, Parquet to store the data on persistent tiers such as local disk, HDFS/S3 etc.

The scan is another key operation on FTable. It allows you to retrieve the selected records by applying a set of arbitrary filters based on specific columns. The scan is a seamless operation that spans across all the configured tiers, as applicable. It retrieves the data from all the tiers transparently. The scan operation utilizes the data locality by applying the filter where the data is and only the matching results are sent over the network.

Modern applications that supports both real-time access and historical analysis, the recent data is accessed more frequently in a real-time manner as compared to the older data that gets utilized in batch oriented way. FTable’s tiered storage policy serves both types of workloads very effectively eliminating the need of multiple types of data stores used in a lambda architectural patten.

Similar to MTable, FTable also supports MASH Shell, Java APIs and connectors with multiple compute frameworks such as Apache SparkApache Hive for programatic access to tiered data store. In subsequent posts we will deep dive into details of these access mechanisms.