Google Cloud Dataflow

Dataflow_128

Continuing on the Big Data theme, Google Cloud Dataflow is the next component I want to take a look in Google Cloud Platform.

What is Dataflow?

  • Dataflow is mainly for batch or stream data processing.
  • Good for high volume computation and embarrassingly parallel workloads.
  • Consists of 2 major components:
    1. Dataflow SDKs: A programming model and SDKs for large-scale cloud data processing.
    2. Dataflow Service: Ties together and fully manages several different Google Cloud Platform technologies to execute data processing jobs in the cloud.
  • Dataflow SDK is being open sourced as Apache Beam.

Dataflow Programming Model

Dataflow Programming Model consists of 4 concepts:

  1. Pipelines: Set of operations that can read a source of input data, transform it and write out the output. Contains data (PCollections) and processing on the data (Transforms)
  2. PCollections: Inputs and outputs for each step in the pipeline. Immutable after creation. 2 flavors:
    • Bounded: Fixed-size data set for text, BigQuery, Datastore or custom data.
    • Unbounded: Continuously updating data set, or streaming data such as Pub/Sub or custom data.
  3. Transforms: A data processing operation, or a step, in the pipeline. Takes PCollection as input and produces PCollection as output. 2 flavors:
    • Core:  You provide the processing logic as a function object. 4 Core transform types: ParDo, GroupByKey, Combine, Flatten.
    • Composite: Built from multiple sub-transforms.
  4. I/O Sources and Sinks: Source APIs to read data into the pipeline, and sink APIs to write output data from your pipeline. APIs for common formats such as:
    • Text files
    • BigQuery tables
    • Avro files
    • Pub/Sub
    • BigTable (beta)

Dataflow SDKs

Two supported languages:

  1. Java: Dataflow SDK for Java is fully available
  2. Python: Dataflow SDK for Python is in development.

Dataflow Service

  • Dataflow Service is a managed service in Google Cloud Platform to deploy and execute Dataflow pipelines (as Dataflow jobs).
  • Simplifies distributed parallel processing by:
    1. Automatic partitioning and distribution of Compute Engine instances.
    2. Optimization of the pipeline.
    3. Automatic scaling of resources as needed.
  • Automatically spins and tears down necessary resources (Compute Engine, Cloud Storage) to run the Dataflow job.
  • Provides tools like Dataflow Monitoring Interface and the Dataflow Command-line Interface.

Resources

Google App Engine

appengine

Intro

The next component I want to explore in the Compute layer of Google Cloud Platform (GCP) is App Engine. As usual, App Engine Docs is the best place to learn more but I want to provide shorter notes here. Being a Java guy myself, I also want to emphasize aspects of App Engine relevant to Java.

What is App Engine?

  • Platform as a Service (PaaS) offering of Google Cloud.
  • Provides managed runtimes for specific versions of Java, Go, PHP, Python.
  • Mainly for web and mobile apps.
  • App Engine provides:
    • Automatic scaling and load balancing.
    • Access to other App Engine services such as Datastore, Blobstore, Memcache, Endpoints, Task Queues, Scheduled Tasks, Search API, Logs API and more.
  • Each language comes with an App Engine SDK for local development, testing and deployment.
  • There are 2 flavors of App Engine:
    1. Sandbox: Provides a sandboxed (and limited) environment to run your web app.
    2. Managed VM (beta): Built on top of Compute Engine, provides more freedom on environment.

Sandbox

In the default flavor of App Engine, your Java app needs to conform these sandbox constraints in order to run on App Engine:

  • App needs to run in Java 7 and a limited subset of Java standard library.
  • App cannot write to file system (use Datastore instead). 
  • No arbitrary network connections.
  • No slow responses: A web request must be handled within reasonable amount of time (the exact time depends on scaling mode).
  • Limited threads and background threads (depends on scaling mode).

App Engine (Sandbox) and Java

  • Runs your Java web application using Java 7 in a sandboxed environment.
  • Uses the Java Servlet 2.5 standard.
  • App Engine Java SDK for development. It also has a plugin for supporting development with Apache Maven.
  • Development server for local development and testing.
  • IDE support: Google Plugin for Eclipse to manage App Engine directly from Eclipse. Additionally, Google App Engine Integration for IntelliJ and NetBeans plugin for Gaelyk Framework.
  • Java apps are deployed as App Engine Modules where each module can act like a microservice.
  • In terms of configuration:
    • Top level app is packaged as EAR (similar to JEE).
    • Individual modules are packaged as WAR within EAR (again similar to JEE).
  • Each Module can independently choose its own scaling type (how each instance is created and scaled)
    • 3 scaling types: Manual, Basic and Automatic.
  • Each scaling type has its own instance classes. The instance class determines compute resources and pricing:
    • Manual: B1, B2, B4, B4_1G, or B8.
    • Basic: B1, B2, B4, B4_1G, or B8.
    • Automatic: F1, F2, F4, or F4_1G.

Managed VM

  • Managed VM is in beta.
  • Based on Google Compute Engine, allows one to customize runtime and OS (beyond what Sandbox provides) using Docker while providing the benefits of App Engine (automatic scaling, load balancing etc.)
  • Native support for Java 8 / Servlet 3.1, Java 7 / Jetty 9, Python 2.7 and Python 3.4, Node.js, and Go. Further customization with Docker.
  • Google Cloud SDK is needed to administer Managed VMs.

Resources

Compute Engine Java API

googlejavaapiThe usual way of managing Compute Engine instances is from GCP Console.  However, if you need programmatic access to Compute Engine, there is a comprehensive REST API. There are also client libraries (that basically wrap the REST API) provided by Google and the community, for Java, .NET, Go, JavaScript, Ruby and more.

In this post, I want to take a look at the Java client libraries. There are 2 client libraries for Java:

  1. Apache JClouds: This is an open source library that allows one to use Java across multiple Cloud providers, including Compute Engine on Google Cloud. This is a good option if you want to work with multiple cloud vendors from the same API.
  2. Google API Client Library for Java: Provided by Google, this is a Java library for accessing all Google APIs (not just Google Cloud), including Compute Engine on Google Cloud.

A good starting point to explore Google API Client Library for Java is Compute Engine Cmdline Sample. This sample basically contains a single class ComputeEngineSample that lists all compute engine instances you have in your project from Java. Nothing earth-shattering but it shows how to authenticate with Google API and interact with Compute Engine.

There’s an instructions.html file that tells you how to setup and use the sample. One thing you’ll realize is that you need to grant API access to your Java app before you can run the sample but instructions.html is a little outdated on that. This is what you need to do to grant API access:

  1. Either create or go to your project from Developer Console.
  2. Open side-menu on top left and go to “API Manager”.
  3.  Under “API Manager”, click on “Credentials” -> “Create Credentials” -> “OAuth Client ID”.
  4. Now, you need to select the application type. In our case, select “Other” and give it a name.
  5. At this point, you’ll have a pop-up with client ID and secret. Copy these down as you’ll need them later.

Once that’s done, you can import the project into Eclipse (or your favorite IDE), fill client_secrets.json in resources folder with client ID and secret from step 5, and fill the following info in ComputeEngineSample:

  • APPLICATION_NAME: Optional but give it a name, so you don’t get warnings.

  • projectId: Your project id from Developers Console.

  • zoneName: Make sure to enter the right zone for your project. In my case, I chose europe-west1-c.

That’s it. Now, you can run ComputeEngineSample from Eclipse (or your favorite IDE) and you can get info about your Compute Engine instances right from Java:

================== Listing Compute Engine Instances ==================
{
"canIpForward" : false,
"creationTimestamp" : "2016-02-16T05:58:00.714-08:00",
"description" : "",
"disks" : [ {
"autoDelete" : true,
"boot" : true,
"deviceName" : "instance-1",
"index" : 0,
"kind" : "compute#attachedDisk",
"mode" : "READ_WRITE",
    ....

 

When a daemon thread is not so daemon

As you know, threads in Java can be marked as daemon via Thread#setDaemon. The main difference between a normal vs. daemon thread is that the JVM exits when the only threads running are all daemon threads. Daemon threads are usually used as service providers for normal threads running in your application as they don’t affect the shutdown of your application like a normal thread might do.

So, you might be tempted to mark a thread as daemon and blindly assume that it will never block the shutdown of your application. While this is true in most cases, as always, there are exceptions. Consider the following piece of code:

    public static void main(String[] args) {
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                int i = 0;
                while (true) {
                    System.out.println(i++);
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        t.setDaemon(true);
        t.start();

        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end");
    }

A daemon thread is created which simply prints a counter every second. Then, there’s a call to Thread#join to wait for the daemon thread to finish and then the main thread prints “end”. Well, “end” will never be printed and the application will not exit because the daemon thread will continue printing counter and Thread#join will block forever. This might be surprising at first glance. Isn’t this a daemon thread? Isn’t it supposed to not block JVM? Yes, but calling Thread#join on a daemon thread effectively turns the daemon thread into a normal thread.

The lesson here is that marking a thread as daemon is only part of the story. How a daemon thread is used throughout your application is also as important and you need to keep that in mind. Another lesson is that don’t use things that can block forever such as Thread#join. I know you designed your application so perfect that you think it will never block but if something can block in theory, it will block in production sooner or later. That can be quite annoying to customers and support people, so those people before blindly implementing blocking code paths. Instead use non-blocking alternatives like Thread#join(long) and provide a timeout.

Thread.sleep vs. Object.wait

In my last 2 posts (here and here), I talked about some simple but overlooked topics in Java threading. I want to wrap up this series by pointing out yet another easily overlooked topic related to threading: the subtle difference between Thread.sleep and Object.wait.

In Java, both Thread.sleep and Object.wait make the current thread wait for a specified amount of time. This is useful when the current thread needs to wait for some other thread before it can proceed. I sometimes see developers use these interchangeably in close proximity in code but this can be problematic as the two methods are quite different.

Consider the example from my previous post. In that example, thread1 acquired a lock, slept for a minute while thread2 was blocked waiting for the same lock. It used TimeUnit.MINUTES.sleep(1) which uses Thread.sleep to make thread1 wait. Let’s change it to use Object.wait and see what happens:

private static void test() {
    final Object lock = new Object();

    Thread thread1 = new Thread(new Runnable() {
        @Override public void run() {
            synchronized (lock) {
                System.out.println("Thread1 acquired lock");
                try {
                    lock.wait(1 * 60 * 1000);
                    //TimeUnit.MINUTES.sleep(1);
                } catch (InterruptedException ignore) {}
            }
        }

    });
    thread1.start();

    Thread thread2 = new Thread(new Runnable() {
        @Override public void run() {
            synchronized (lock) {
                System.out.println("Thread2 acquired lock");
            }
        }
    });
    thread2.start();
}

Once you run this sample, you’ll see the following right away:

Thread1 acquired lock
Thread2 acquired lock

So what happened? When thread1 went to sleep, it released the lock and thread2 acquired it right away. Unlike Thread.sleep (where locks are not released) in Object.wait, locks are released as the thread goes to sleep. In some situations, this might be appropriate, in others, maybe not, so this is an important thing to keep in mind when deciding which sleep method to use.