All Blogs

Luis Rascão

|

December 2, 2024

Building Real-Time Distributed Apps with Diagrid Catalyst

Catalyst, a serverless platform built on the open-source Dapr project, drastically simplifies building distributed applications by providing APIs that handle many of the complexities for you.

Building Real-Time Distributed Applications

Creating an interactive application that supports many users simultaneously collaborating on a shared experience is a difficult challenge. It requires low-latency user input and feedback, a shared view of the world, and ensuring the system can scale to support many users. On top of that, the operational concerns of the system need to be considered, including reliability and observability. Typically, building a system like this has required a lot of engineering knowledge and has been difficult to implement.

Introducing Diagrid Catalyst

Catalyst, a serverless platform built on the open-source Dapr project, drastically simplifies building distributed applications by providing APIs that handle many of the complexities for you. The suite of APIs is composed of Building Blocks which encapsulate common distributed system patterns, such as, service-to-service messaging, asynchronous messaging and state management. The Building Blocks support a pluggable Component model that allows you to use your preferred infrastructure providers without coupling your code to their APIs and SDKs. The Catalyst APIs are exposed via HTTP and gRPC and work with the existing Dapr SDKs, which are available in most popular programming languages. As a result, you can focus on your application logic and leave the complexities of distributed systems to Catalyst.

Project Overview: A Real-Time Canvas Drawing App

To showcase the power of Catalyst, I have built a real-time, collaborative drawing web app inspired by the popular Reddit r/place event. This massive event occurs sporadically, and its mechanics are straightforward:

  • Each user can place a single colored pixel onto the canvas at most every 5 minutes.
  • You are allowed to overwrite existing pixels with your own.
  • When the event ends, the entire canvas gets blanked out.

The objective is to work with others to try to draw something recognizable before others sabotage your efforts.

Similar to the original Reddit event, this app allows multiple users to draw on a shared canvas, one pixel at a time. Each user can select a color and can then place a single pixel at a chosen position. As all users are editing the same canvas, they require immediate feedback and a consistent view of all pixels. The ability for users to place pixels is restricted by a short cooldown time between updates. Here’s a time-lapse of the last 2023 Reddit event.

The goal of this project is to demonstrate how the Catalyst’s APIs can be leveraged to implement such an application in a simple way.

I’ll walk you through the project requirements, the solution architecture, and how I’ve been able to utilize Diagrid Catalyst to rapidly build it.

By the end, you’ll have a clear understanding of why you should consider Catalyst next time you need to build a distributed application and how it can improve your developer productivity.

From Architecture to Implementation: Why Choose Diagrid Catalyst?

Project Requirements

Before diving into the details of the architecture or implementation, let’s outline the high-level requirements I’ve identified for building our collaborative drawing app’s minimum viable product.

  1. Allow all users to view a consistent shared canvas that dynamically updates without refresh.
  2. Allow users to input their own colored pixels on the canvas and have them be persisted.
  3. Enforce a cool down period between pixel updates.

Solution Architecture

By reviewing each of the requirements, these system behaviors have been identified:

  1. Real-time browser updates of pixels to allow the canvas to re-render.
  2. Reliable, consistent and persistent storage to maintain the collective state of the canvas.
  3. User session management to handle cooldown periods.
  4. Event broadcasting for when a user adds a new pixel.

At a high level, we can start to model a system that could satisfy these requirements.

  1. Websockets to send and receive real-time updates in our web app.
  2. A durable state store to persist the shared canvas and cool down data.
  3. A pub/sub system to asynchronously notify all users of new pixel updates.

I’ve drawn the model below and annotated it with the expected flow:

So now we roughly know what components we have in our solution architecture:

  • A real-time web app for user’s to interact with the system
  • A backend service hosting an API to provide the web app with the necessary functionality
  • One or more State Stores for persisting data
  • A Pub/Sub to allow new pixel updates to be broadcast to many subscribers

Using the Dapr SDKs with the Catalyst APIs, we can implement this system based on this current architecture without needing to know the details of exactly which infrastructure is providing the State Store or Pub/Sub. This is because the APIs abstract you from the underlying implementation, so it can be changed using only configuration. You can read more about the Dapr concepts here.

Implementation

Frontend

The frontend is a basic one-page HTML document with some embedded JavaScript to:

Here’s a small snippet of how a pixel placed by a user is rendered on the canvas:

   console.log('connecting to websocket at', window.location.host, protocol);
   ws = new WebSocket(`${protocol}://${window.location.host}/ws`);
   
   ...
   
   // handle events
   ws.onmessage = function(event) {
     console.log('got event', event);
     const op = JSON.parse(event.data);
     switch (op.type) {
       ....
       case "put":
          data = JSON.parse(op.data);
          drawPixel(data.x, data.y, data.color);

Backend Service

Real-time updates via Web Sockets

We want our backend service to expose a web socket API so that the web clients in the user’s browser can send and receive near real-time updates.

For this I have written a simple Go application that listens on a HTTP port and accepts web socket connections.

Once a new web socket request is received, we build a new client to handle the connection and invoke our application logic. This is the API through which the user’s pixel changes will be sent and changes made by other users will be received.

Code snippet: Accepting WebSocket client connections

// add a handler to a previously created HTTP server that upgrades
// the connection to WebSocket when `/ws` is served
r.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println("Error upgrading connection to WebSocket:", err)
        return
    }
    
    // client.New creates a client that will handle all commands
    // as well as relaying events.
		c := client.New(conn, dapr, cfg)
		mu.Lock()
		clients = append(clients, c)
		mu.Unlock()
		slog.Info("client connected", "remoteaddr", conn.RemoteAddr(), "total", len(clients))

		// handle all requests coming in from this client
		c.Handle(r.Context())
})

Persisting data using Catalyst’s state API

Once we’ve received the user’s pixel updates, we can leverage Dapr’s state management API to easily persist the data in a durable state store. The Dapr abstraction provides us with enough flexibility to be useful but guards us against the idiosyncrasies of the underlying state store implementation.

Code snippet: Persisting a pixel’s state

....
// handle a client `put` command, this contains the pixel information
// (ie. color, x and y coordinates)
switch req.Type {
	case RequestTypePut:
	p := pixel.New()
	if err := p.Unmarshal([]byte(req.Data)); err != nil {
		slog.Error("error unmarshalling pixel: ", err)
		break
	}
	slog.Info("received put pixel request", "pixel", p)

	data := PixelMetadata{
		Pixel: p,
		User:  u,
	}
	if err := c.savePixel(ctx, u.Name, data); err != nil {
		slog.Error("error saving pixel: ", err)
		break
	}
	
	// broadcast the event to all clients
	if err := c.broadcast(ctx, data); err != nil {
		slog.Error("error broadcasting pixel: ", err)
		break
	}
....

func (c *client) savePixel(ctx context.Context, username string, data PixelMetadata) error {
	jsonData, err := json.Marshal(data)
	if err != nil {
		return fmt.Errorf("error marshalling pixel data: %w", err)
	}
...
	key := fmt.Sprintf("c%d_%d", data.GetX(), data.GetY())
	if err := c.dapr.SaveState(ctx, c.cfg.StateStore.Name, key, jsonData, nil); err != nil {
		return fmt.Errorf("error saving pixel data: %w", err)
	}
...

Notice how the pixel is saved in a state store simply by providing a name from the configuration value c.cfg.StateStore.Name. This named component allows me to write code and test against a provider like the free Catalyst Managed KVStore. I can also choose an entirely different state store without changing the application code, only the component definition in Catalyst would change.

I also enforce a cool down on the rate at which a user is allowed to place a pixel on the canvas. Again, I can use Dapr’s state APIs to persist this in the same or a different named component.

	// build a unique key for this user
	cooldownKey := cooldownUserKey(username)
	cooldownJsonData, err := json.Marshal(c.cfg.Cooldown.TTL)
	if err != nil {
		return fmt.Errorf("error marshalling cooldown data: %w", err)
	}

	metadata := map[string]string{"ttlInSeconds": c.cfg.Cooldown.TTL}
	if err := c.dapr.SaveState(ctx, c.cfg.Cooldown.Name, cooldownKey, cooldownJsonData, metadata); err != nil {
		return fmt.Errorf("error saving pixel data: %w", err)
	}

Dapr's SaveState primitive accepts a ttlInSeconds parameter that defines how long this piece of data will persist. This is ideal for implementing cooldowns. Placing a new pixel is restricted by the existence of this key—once it expires and is removed, you're free to place another one.

Dapr's abstraction is excellent, but it has limitations. Not all backing stores support the ttlInSeconds parameter. Be sure to review the feature set of each supported component state store before deciding on backing infrastructure.

Broadcasting Pixel updates via Catalyst’s Pub/Sub

For each pixel update, we want to broadcast to all other user’s so that their canvas can be updated. We can use Dapr’s Pub/Sub API backed by Catalyst to easily implement this. When we receive an update, we can simply publish it to a Pub/Sub topic, on which all subscribed clients will receive (fan-out).

Code snippet: Broadcasting pixel updates

func (c *client) broadcast(ctx context.Context, data PixelMetadata) error {
	jsonData, err := json.Marshal(data.Pixel)
	if err != nil {
		return fmt.Errorf("error marshalling pixel data: %w", err)
	}

  // publish the event data onto a configurable topic name (`c.cfg.PubSub.Topic`)
  // that resides in the pubsub (`c.cfg.PubSub.Name`)
	if err := c.dapr.PublishEvent(ctx,
		c.cfg.PubSub.Name,
		c.cfg.PubSub.Topic,
		jsonData); err != nil {
		return fmt.Errorf("error broadcasting pixel data: %w", err)
	}

	return nil
}

Similar to the Dapr State Store mentioned earlier, I'm simply publishing an event to a named component. During development, I'm utilizing the free Catalyst Managed Pub/Sub. However, I have the flexibility to switch to alternative implementations like RabbitMQ or AWS SNS/SQS in the future if needed.

Receiving pixel events via Catalyst’s Pub/Sub

As we've seen, users' pixel placements are published to a pubsub component. These events are then relayed to all interested subscribers, who can handle them as needed. In our case, when we receive a pixel event, we broadcast it to all currently connected WebSocket clients.

Code snippet: Handling pixel events

	// start the subscriber that will handle external events
	if err := subscriber.Start(ctx, dapr, &cfg,
	  // every pixel event is handled in the func below
		func(p pixel.Pixel) error {
			data, err := p.Marshal()
			if err != nil {
				return fmt.Errorf("error marshaling pixel: %w", err)
			}

			// broadcast event to all websocket clients in this replica
			mu.Lock()
			defer mu.Unlock()
			slog.Info("broadcasting pixel to clients", "pixel", p, "#clients", len(clients))

			for _, c := range clients {
				if c.Send(client.Event{
					Type: client.EventTypePut,
					Data: string(data),
				}); err != nil {
					slog.Error("error sending pixel to client: ", err)
				}
			}

			return nil
		}); err != nil {
		slog.Error("error starting grpc server: ", err)
		return
	}

Code snippet: subscriber internals

		// use Dapr's Go SDK to subscribe to the configured pubsub/topic
		subscription, err := client.Subscribe(ctx,
			daprsdk.SubscriptionOptions{
				PubsubName: cfg.PubSub.Name,
				Topic:      cfg.PubSub.Topic,
			})
			
		...
		
		// loop through all messages that arrive through the subscription,
		// unmarshal them and pass them to the handler func.
		for {
			select {
			case <-ctx.Done():
				log.Println("Shutting down subscriber gracefully...")
				return nil
			default:

				msg, err := subscription.Receive()
				if err != nil {
					fmt.Printf("error receiving message: %v", err)
				}
				if msg == nil {
					continue
				}
				
				// Process the event
				p := pixel.New()
				if err := p.Unmarshal(msg.RawData); err != nil {
					return fmt.Errorf("error unmarshaling pixel: %w", err)
				}
				slog.Info("received pixel event", "pixel", p)

				if err := fn(p); err != nil {
					return fmt.Errorf("error handling pixel: %w", err)
				}
    ...

Recap

With minimal code, we built a functional (albeit simple) prototype of a collaborative drawing application by leveraging off-the-shelf components that encapsulate significant complexity. Throughout the project, I maintained focus on the business logic, alleviating the cognitive burden of learning numerous SDKs while avoiding premature commitment to a single provider. This prototyping phase is about rapid development and iteration to understand whether your solution is sufficient, and not assessing the details of your infrastructure.

Conclusion

As shown in this article, building distributed systems often involves many moving pieces and can be a daunting task for any developer. This results in slow development cycles, increased maintenance costs and burning effort on work that does not deliver value to your end users. With Catalyst, we’ve been able to dramatically simplify our code, whilst reducing the risks of defects and vulnerabilities in implementing common patterns.

We’ve walked through the process of building a real-time collaborative drawing app inspired by the Reddit r/place event, and shown how Catalyst simplifies the development challenges. On top of this, the Catalyst platform provides reliability and out of the box observability through logs, metrics and traces to ensure you and your team can easily operate and monitor your live system.

The Call Graph in Catalyst showing the application and components.
API Logs in Catalyst showing the requests to the interact with the state store and message broker.

Key Takeaways

  • Faster Development: Catalyst's abstractions remove the burden of dealing with low-level infrastructure and learning specific SDKs, allowing you to build applications faster.
  • Simplicity and Consistency: A single programming model for all your distributed concerns means less risk of vulnerabilities, defects, and knowledge gaps.
  • Scalability and Reliability: Catalyst’s building blocks are designed for resilience and scalability, making it easy to extend your application as it grows.

Whether you’re building a small prototype or a production-scale distributed system, Catalyst provides the tools and patterns to make complex architectures simple. If you’re looking for a way to streamline your distributed development, I encourage you to explore Catalyst and see how it can help bring your ideas to life faster.

Next Steps:

  • If you don’t have an account yet, sign up for Catalyst and try it out!
  • Check out the Diagrid Catalyst documentation to explore more use cases.
  • Try implementing additional features in the canvas app, such as collaborative drawing tools, user authentication, or even a leaderboard. All source code is available on Github and can be run in a devcontainer.

Happy building!