[book note] Manning MEAP: Streaming Data – Collection Tier

Common interaction patterns

Request / response

Screen Shot 2017-01-10 at 4.32.09 PM

Screen Shot 2017-01-10 at 4.33.38 PM

Screen Shot 2017-01-10 at 4.33.44 PM

Request / acknowledge

Stream

The interaction style is quite different than all of the others we've talked about thus far. With all of the other patterns a client was making a request to a service that may or may not have returned a response. With the pattern we're flipping things around the service becomes the client.

Screen Shot 2017-01-10 at 11.34.53 PM

There are a couple of important distinctions to point out when comparing the previous patterns (all the request/response optional patterns) with the stream pattern:

  • With the request/response style of interaction as depicted at the top of figure 2.9, the client pushes data to the service in the request and the service may respond. This response is grayed out in the diagram, because the response is not required by some variations of this pattern. It boils down to a single request resulting in zero or one response. The stream pattern as depicted at the bottom of figure 2.9 is quite different; a single request results in no data or a continual flow of data as a response.
  • The second difference between the request/response optional patterns and the stream pattern is that:
    • In the former a client external to the streaming system is pushing the message to it.
    • In our previous examples this was a web browser, a car, or a phone — all clients that send a message to our collection tier.
    • In the case of the stream pattern, our collection tier connects to a stream source and pulls data in.
    • For example, you may be interested in building a streaming system to do sentiment analysis of tweets. To do so you'd build a collection tier that establishes a connection to Twitter and consumes the stream of tweets.

// Listing 2.1 Example JSON stream event
{
  "venue": {
    "venue_name": "Chicago Symphony Center",
    "lon":
    "lat":
    "venue_id":
  },
  "visibility": "public",
  "response": "yes",
  "member": {
    "member_id":
    "member_name": "Rifat"
  }
}

Scaling the interaction patterns

Request/response optional

If you remember, in chapter 1 we talked about horizontal scaling bing our overall goal for every tier of our streaming system. With this example and our use of the request/response optional pattern, horizontal scaling will work very well for two reasons:

  • First, with this pattern we don't have any state information about the client making the request, which means that a client can connect and send a request to any service instance we have running.
  • Second — and this is a result of the stateless nature of this pattern — we can easily add new instances of this service without changing anything about the currently running instances.

The mode of scaling stateless services is so popular that many cloud hosting providers, such as Amazon, provide a feature called auto-scaling that will automatically increase or decrease the number of instances running based on demand.

On top of horizontal scaling we also want our service to be stateless, which will allow any vehicle to make a request to any instance of our service at any time. This stateless trait is commonly found in systems that use this pattern. Taking horizontal scaling and statelessness into consideration, we arrive at figure 2.10, which shows these two aspects together.

Screen Shot 2017-01-11 at 1.30.53 PM

We're using a load balancer here to be able to route requests from the vehicles to an instance of our service that's running. As instances are started or stopped based on demand, the running instances the load balancer routes requests will change.

Scaling the stream pattern

Screen Shot 2017-01-11 at 1.04.37 PM

Previous we mentioned that horizontal scaling is our goal when building each tier of our streaming system. With many streaming protocols, there's a direct and persistent connection between the client (our collection tier) and the server (the service we request data from).

In figure 2.11 you can see that three of the four nodes are idle because there's a direct connection between the search stream and the node handling the stream. To scale our collection tier we have a couple of options:

  • scaling up the collection node that's consuming the stream and introducing a buffering layer in the collection tier.

To scale our collection tier we have a couple of options:

  • scaling up the collection node that's consuming the stream and introducing a buffering layer in the collection tier.
    • These are not mutually exclusive, and depending on the volume and velocity of the stream both may be required.
    • Scaling up the node consuming the stream will only get us so far; at a certain point we'll reach the limits of the hardware our collection node is running on and won't be able to scale it up any further.

Screen Shot 2017-01-11 at 1.42.10 PM

The key to being able to put a buffering layer in the middle lies in making sure that no business logic is performed on the messages when they're consumed from the stream.

  • Instead, they should be consumed from the stream and as quickly as possible pushed to the buffering layer.

A separate set of collection nodes that may perform business logic on the messages will then consume the messages from the buffering layer.

  • The second set of collection nodes can now also be scaled horizontally.

Fault tolerance

Our overarching goal is that when a collection node crashes (and it will) we do not lose data and can recover as if the crash had never occurred.

The two primary approaches to implementing fault tolerance, checkpointing and logging, are designed to protect against data loss and enable speedy recovery of the crashed node.

First, let's consider checkpointing. There are a variety of checkpoint-based protocols in the literature to choose from, but when you boil them down, the following two characteristics can be found in all of them:

  • Global snapshot — They require that a snapshot of the global state of the whole system be regularly saved to storage somewhere, not just the state of the collection tier.
  • Potential for data loss — They only ensure that the system is recoverable up to the most recent recorded global state; any messages that were processed and generated afterward are lost.

For global snapshot, it means we're able to capture the entire state of all data and computations from tier through the data access tier and save it to a durable persistent store.

  • This is what we're taking about when we refer to the global state of the system. This state is then used during recovery to put the system back into the last known state.
  • The potential for data loss exists if we can't capture the global state every time data is changed in the system.

When considering using a checkpoint protocol for implementing fault tolerance in a streaming system, you have to keep two things in mind:

  • the implications of the previously mentioned attributes and the fact that a steaming system is composed of many layers and many different technology.
    • This layering and the data movement make it very hard to consistently capture a global snapshot at a point in time.
    • This make checkpointing a bad choice for a streaming system.
  • But checkpointing is a valid choice if you're building the next version of HDFS or perhaps a new NoSQL data store.

Turning our attention to the logging protocols, you have a variety to choose from. Reducing them to their essence, you'll find that they all share the common goals of overcoming the expense and complexity of checkpointing and providing the ability to recover up to the last message received before a crash.

In the end the goals of the logging technique manifest themselves in the basic idea that underpins all of the logging techniques: If a message can be replayed, then the the system reach a global consistent state without the need fora global snapshot.

This means that each tier in the system independently records all messages it receives and plays them back after a crash.

  • Implementing a logging protocol frees us from worrying about maintaining global state, enabling us to focus how to add fault tolerance to the collection tier.
  • To do this we're going to discuss two classic techniques, receiver-based message logging (RBML) and sender-based message logging(SBML), and an emerging technique called hybrid message logging.

Screen Shot 2017-01-11 at 2.13.55 PM

Figure 2.14 shows a single collection tier node that's receiving a message, performing some logic on it, and then send it to the next tier. As their names imply:

  • receiver-based logging is concerned with protecting the data the node is receiving and
  • sender-based logging is concerned with protecting the data that's going to be sent to the next tier.

Receiver-based message logging

The RBML technique involves synchronously writing every received message to stable storage before any action is taken on it. By doing that, we can ensure that when your software crashes while handling the message, we already have it saved and upon recovering we can replay the message.

Screen Shot 2017-01-11 at 3.44.23 PM

  1. A message is sent from a data producer (any client).
  2. A new piece of software we wrote for the collection node, called the RBML logger, gets the message from the data producer and sends it to storage.
  3. The message is written to stable storage.
  4. The message then proceeds through to any other logic we have in the node; perhaps we want to enrich the data we are collecting, filter it, and/or route it based on business rules. The important aspect is we are recording the data as soon as it is received and before we do anything to it.
  5. The message is then sent to the message queueing tier, the next tier in the streaming system.

It's important to point out that depending on the type of stable storage used, step 2 and have the potential of negatively impacting the throughput performance of our collection node.

  • Some times in the literature you'll see this called out as one of the drawbacks to logging protocols.

Screen Shot 2017-01-11 at 3.57.23 PM

In figure 2.16 there are a couple of things to call out.

  • First, once the crash occurs, all incoming messages to this collection node are stopped. Because you'll have more than one collection node and they'll be behind a load balancer, you would take this node out of rotation.
  • Next, the RBML logger reads the messages that have not been processed from stable storage and send then through the rest of the node logic as if nothing has happened.

Sender-based message logging

The SBML technique involves writing the message to stable before it is sent.

  • If you think of the RBML technique as logging all messages that come in the SBML is the act of logging all outgoing messages from our collection node before we send them, protecting ourselves from the next tier crashing or a network interruption.

Screen Shot 2017-01-11 at 5.58.54 PM

Besides this nuance there's a little wrinkle we'll need to deal with during recovery. During recovery how do we know if the next tier has already processed the message we're replaying? There are several ways to handle this.

  • One that si shown in figure 2.17 is that we use a message queueing tier that returns an acknowledgement that it received the message.
    • With the acknowledgement in hand, we can either mark the message as replayed in stable storage, or we can delete it from stable storage because we don't need to replay it anymore during recovery.
  • If the technology you choose for your message queueing tier doesn't support returning an acknowledgement of any sort, then you may be forced into a situation where if no error occurs when sending the message to the message queueing tier.
    • steps 6 and 7 will result in you deleting the message from stable storage.

Screen Shot 2017-01-11 at 11.34.31 PM

Hybrid message logging

As we discussed, writing to stable storage can negatively impact our collection node's performance. Implementing both RBML and SBML means we're writing to stable storage at least twice during normal execution.

Screen Shot 2017-01-11 at 8.51.29 PM

To help with this, an emerging technique called hybrid message logging has been designed to take the best parts of RBML and SBML at the cost of minimal additional complexity. HML is also designed to provide the same data loss protection and recoverability found in RBML, SBML, and other logging techniques. There are several ways to implement HML; one common approach is illustrated in figure 2.20.

Screen Shot 2017-01-11 at 8.55.09 PM

Several factors contribute to this simplification.

  • The first one, which may not come as a surprise, is that the two stable storage instances have been consolidated.
    • This is a minor change, but it allows you to reduce the number of moving parts.
  • The second change, writing to stable storage asynchronously, has a subtle difference. Arguably this has a more profound impact on the implementation complexity and performance.
    • The complexity comes from making sure you are correctly handling any errors that happen, and
    • The performance comes from leveraging the multi-core world we live in to perform more than one task at a time.

If you're interested in learning more about this protocol a great article to start with is Meyer, Rexachs, and Luque.

Implementation

The WebSockets API and Netty were chosen for the following reasons. First, WebSockets is a more efficient protocol as we have discussed in earlier chapters and secondly we will be using both WebSockets and Netty latter in the chapter when we build Data Access API.

  • This allows us to keep things simpler, which is not just being done as an exercise for this chapter but is something you should aim to achieve when you build a real streaming pipeline in the future.

Collection Service Data Flow

When building our collection service, we want to take into consideration the following capabilities:

  1. Managing the connection to the Meetup API
  2. Ensuring we do not lose data
  3. Integrating with the Message Queueing Tier

The overall flow of how this will put together is illustrated below in figure 9.2.

Screen Shot 2017-01-11 at 8.37.13 PM

// Listing 9.1 Initialization of HybridMessageLogger
public class CollectionServiceWebSocketClient {
  public static void main(String[] args) throws Exception {
    try {
      HybridMessageLogger.initialize();
    } catch (Exception exception) {
      System.err.println("Could not initialize HybridMessageLogger!");
      exception.printStackTrace();
      System.exit(-1);
    }
  }
}
// Listing 9.8 `HybirdMessageLogger` addEvent call

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.