Category: Microservices

  • Hazelcast Transactional Outbox: Guaranteed Delivery

    Hazelcast Transactional Outbox: Guaranteed Delivery

    Part 8 in the “Building Event-Driven Microservices with Hazelcast” series


    Introduction

    In Part 7, we added circuit breakers and retry to protect saga listeners from transient failures on the consumer side. That covers what happens when a service receives an event and can’t process it. But we haven’t talked about what happens when the event never leaves the building.

    Quick refresher on our dual-instance architecture: each service runs an embedded Hazelcast instance for local Jet pipeline processing and a client connected to the shared cluster for cross-service ITopic communication. After the pipeline processes an event, the EventSourcingController republishes it to the shared cluster so saga listeners in other services can react.

    That republish step? It was a fire-and-forget call:

    // The old approach — fragile
    try {
        ITopic<GenericRecord> topic = sharedHazelcast.getTopic(pending.eventType);
        topic.publish(pending.eventRecord);
    } catch (Exception e) {
        logger.warn("Failed to republish event {}: {}", pending.eventType, e.getMessage());
        // Event is permanently lost!
    }
    

    If the shared cluster is unreachable — network partition, cluster restart, someone tripping over the power cable — the event vanishes. The saga never progresses. Eventually the saga timeout detector marks it as failed, but by then the original event data is gone and there’s nothing to retry.

    The Transactional Outbox Pattern fixes this. Instead of publishing directly to the shared cluster, the controller writes the event to a local outbox — an IMap on the embedded Hazelcast instance — and a separate publisher component picks it up and delivers it. If delivery fails, the entry stays in the outbox and gets retried.


    Why Direct Publishing Fails

    The problem is fundamental. Publishing to an external system (the shared cluster) and completing a local operation (the Jet pipeline) are two separate operations that can’t be made atomic.

    Failure timeline for direct publishing — the Jet pipeline updates the local event store and materialized view, but the publish to the shared cluster ITopic fails on a network partition and the event is lost with nothing left to retry

    The event is safely stored in the local event store and materialized view, but the cross-service notification is lost. You could retry in place, but that blocks the Jet pipeline for all events. You could schedule an async retry, but if the process restarts, that retry state is gone too.

    The outbox pattern trades immediate delivery for guaranteed delivery. Write to a durable local store, deliver asynchronously, retry until it works. It’s the standard solution in event-driven architectures for good reason.


    Architecture

    Transactional outbox architecture — the EventSourcingController writes each event to a durable local OUTBOX IMap and signals the OutboxPublisher via a semaphore; the publisher claims pending entries and delivers them to the shared cluster ITopic, retrying on failure

    The outbox IMap lives on the embedded Hazelcast instance — the same instance that hosts the event store and materialized views. Writing to it is a local operation. If the embedded instance is up (and it must be, since the pipeline just ran), the outbox write succeeds.


    The OutboxEntry

    Each outbox entry captures everything needed to deliver the event later:

    public class OutboxEntry {
    
        private String eventId;          // Matches the domain event's eventId
        private String eventType;        // ITopic name (e.g., "OrderCreated")
        private GenericRecord eventRecord; // The serialized event to publish
        private int retryCount;          // Delivery attempts so far
        private Status status;           // PENDING, DELIVERED, or FAILED
        private Instant createdAt;       // When the entry was created
        private Instant lastAttemptAt;   // When the last delivery attempt occurred
        private String failureReason;    // Most recent failure message
    
        public enum Status {
            PENDING,    // Awaiting delivery
            DELIVERED,  // Successfully published to shared cluster
            FAILED      // Permanently failed after max retries
        }
    }
    

    The eventRecord field is the full GenericRecord that needs to go to the shared cluster’s ITopic — same record the Jet pipeline produces, complete with saga metadata like sagaId and correlationId.


    OutboxStore: The Interface

    Six methods covering the full lifecycle:

    public interface OutboxStore {
    
        void write(OutboxEntry entry);
    
        List<OutboxEntry> pollPending(int maxBatchSize);
    
        void markDelivered(String eventId);
    
        void markFailed(String eventId, String reason);
    
        void incrementRetryCount(String eventId, String failureReason);
    
        long pendingCount();
    }
    

    Provider-agnostic. The Hazelcast implementation uses an IMap, but the interface could just as easily sit in front of a database table.


    HazelcastOutboxStore

    The Hazelcast implementation stores entries as Compact-serialized GenericRecord values in an IMap:

    public class HazelcastOutboxStore implements OutboxStore {
    
        private static final String SCHEMA_NAME = "OutboxEntry";
        private final IMap<String, GenericRecord> outboxMap;
    
        public HazelcastOutboxStore(HazelcastInstance hazelcast, MeterRegistry meterRegistry) {
            this.outboxMap = hazelcast.getMap(DEFAULT_MAP_NAME);
        }
    }
    

    You might wonder why we’re using GenericRecord instead of storing OutboxEntry Java objects directly. The problem is that OutboxEntry has an Instant field and a nested GenericRecord — neither of which Hazelcast’s zero-config Compact serialization can handle. We’d need a custom CompactSerializer registered on every Hazelcast instance configuration. Instead, we convert at the boundary:

    static GenericRecord toRecord(final OutboxEntry entry) {
        return GenericRecordBuilder.compact(SCHEMA_NAME)
                .setString("eventId", entry.getEventId())
                .setString("eventType", entry.getEventType())
                .setGenericRecord("eventRecord", entry.getEventRecord())
                .setInt32("retryCount", entry.getRetryCount())
                .setString("status", entry.getStatus().name())
                .setInt64("createdAt", entry.getCreatedAt().toEpochMilli())
                .setNullableInt64("lastAttemptAt",
                        entry.getLastAttemptAt() != null
                                ? entry.getLastAttemptAt().toEpochMilli() : null)
                .setString("failureReason", entry.getFailureReason())
                .build();
    }
    

    A few things going on here. Instant becomes int64 epoch millis — compact, sortable, unambiguous. lastAttemptAt uses setNullableInt64 because it’s null until the first delivery attempt. The nested eventRecord uses setGenericRecord, which Compact handles natively. And status is stored as the enum name string, which makes it readable in Management Center and queryable with Predicates.equal().

    Polling uses a Hazelcast predicate to filter by status, sorted by creation time so the oldest entries are delivered first:

    @Override
    public List<OutboxEntry> pollPending(final int maxBatchSize) {
        final Collection<GenericRecord> pending = outboxMap.values(
                Predicates.equal("status", OutboxEntry.Status.PENDING.name()));
    
        return pending.stream()
                .map(HazelcastOutboxStore::fromRecord)
                .sorted(Comparator.comparing(OutboxEntry::getCreatedAt))
                .limit(maxBatchSize)
                .collect(Collectors.toList());
    }
    

    The OutboxPublisher

    The publisher bridges the outbox and the shared cluster. The obvious approach is to poll on a fixed interval — once per second, say — but that adds latency we don’t need. We know exactly when a new entry arrives.

    Event-Driven Wake-Up

    The publisher uses a Semaphore to sleep until someone signals it:

    public class OutboxPublisher {
    
        private final Semaphore wakeUp = new Semaphore(0);
    
        public void notifyNewEntry() {
            // Release at most 1 permit — avoids unbounded accumulation
            if (wakeUp.availablePermits() == 0) {
                wakeUp.release();
            }
        }
    
        public boolean waitForWork() {
            try {
                return wakeUp.tryAcquire(
                        properties.getPollInterval().toMillis(),
                        TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }
    }
    

    When the EventSourcingController writes an outbox entry, it calls notifyNewEntry() right after. The publisher wakes up, claims all pending entries, delivers them. Under normal conditions, the time from event creation to shared-cluster delivery is sub-millisecond.

    The poll interval (default 1 second) is the safety net. If a signal gets missed — maybe the publisher was busy with a previous batch — the timeout ensures nothing sits around for too long.

    This is a JVM-local semaphore, not a distributed one. That’s fine. When the service scales to multiple replicas with per-service clustering (ADR 013), each replica has its own publisher. The semaphore wakes the local publisher instantly for locally-written events. Events written by other replicas get picked up within the poll interval. The actual coordination — preventing two replicas from delivering the same event — happens in claimPending() via an atomic ClaimEntryProcessor on the IMap.

    The Publish Loop

    public void publishPendingEntries() {
        if (sharedHazelcast == null) {
            if (!noSharedClusterWarningLogged) {
                logger.warn("No shared Hazelcast instance — outbox delivery skipped");
                noSharedClusterWarningLogged = true;
            }
            return;
        }
    
        List<OutboxEntry> claimed = outboxStore.claimPending(
                properties.getMaxBatchSize(), memberUuid);
    
        if (claimed.isEmpty()) {
            return;
        }
    
        for (OutboxEntry entry : claimed) {
            try {
                ITopic<GenericRecord> topic = sharedHazelcast.getTopic(entry.getEventType());
                topic.publish(entry.getEventRecord());
                outboxStore.markDelivered(entry.getEventId());
            } catch (Exception e) {
                if (entry.getRetryCount() + 1 >= properties.getMaxRetries()) {
                    outboxStore.markFailed(entry.getEventId(),
                            "Max retries exceeded: " + e.getMessage());
                } else {
                    outboxStore.incrementRetryCount(entry.getEventId(), e.getMessage());
                }
            }
        }
    }
    

    Note claimPending rather than pollPending. The claiming mechanism uses an EntryProcessor to atomically transition entries from PENDING to CLAIMED, tagging them with the claiming member’s UUID. This prevents two publisher instances from delivering the same event — important once you’re running multiple replicas.

    When no shared cluster is configured (single-node dev mode), the publisher logs one warning and stops trying. Events pile up as PENDING in the outbox. They’ll drain as soon as a shared cluster appears.

    Retry escalation is per-entry:

    Attempt 1: fails → incrementRetryCount (retryCount=1)
    Attempt 2: fails → incrementRetryCount (retryCount=2)
    ...
    Attempt 5: fails → markFailed (retryCount=5 >= maxRetries=5)
    

    Once marked FAILED, the entry stops showing up in claim results. The failure reason is preserved for debugging.

    Scheduling

    OutboxAutoConfiguration hooks the publisher into Spring’s task scheduler:

    @EnableScheduling
    public class OutboxAutoConfiguration implements SchedulingConfigurer {
    
        @Override
        public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
            taskRegistrar.addFixedDelayTask(() -> {
                outboxPublisher.waitForWork();       // blocks until signaled or timeout
                outboxPublisher.publishPendingEntries();
            }, 1);  // 1ms loop delay — actual timing controlled by semaphore
        }
    }
    

    The 1ms fixed delay means the loop restarts almost immediately after each cycle, but waitForWork() controls the actual pacing. The thread blocks on the semaphore until either a permit is released or the poll interval elapses. Near-instant delivery under normal load, guaranteed pickup if a signal is missed.


    Integration with EventSourcingController

    The controller’s republishToSharedCluster now checks for an outbox store first:

    private void republishToSharedCluster(PendingCompletion<K> pending) {
        if (sharedHazelcast == null || pending.eventRecord == null || pending.eventType == null) {
            return;
        }
        if (outboxStore != null) {
            OutboxEntry entry = new OutboxEntry(
                    pending.completionInfo.getEventId(),
                    pending.eventType,
                    pending.eventRecord
            );
            outboxStore.write(entry);
            if (outboxPublisher != null) {
                outboxPublisher.notifyNewEntry();
            }
        } else {
            // Legacy direct publish (when outbox is disabled)
            try {
                ITopic<GenericRecord> topic = sharedHazelcast.getTopic(pending.eventType);
                topic.publish(pending.eventRecord);
            } catch (Exception e) {
                logger.warn("Failed to republish event {}: {}", pending.eventType, e.getMessage());
            }
        }
    }
    

    Fully backward compatible. When outboxStore is injected, events go through the durable path. When it’s null, you get the old fire-and-forget behavior. The OutboxStore is wired through each service’s config as an optional dependency:

    @Bean
    public EventSourcingController<Order, String, DomainEvent<Order, String>> orderController(
            HazelcastInstance hazelcastInstance,
            @Qualifier("hazelcastClient") HazelcastInstance hazelcastClient,
            @Autowired(required = false) OutboxStore outboxStore,
            ...) {
        return EventSourcingController.builder()
                .hazelcast(hazelcastInstance)
                .sharedHazelcast(hazelcastClient)
                .outboxStore(outboxStore)
                .build();
    }
    

    Delivery Guarantees

    The outbox provides at-least-once delivery. If the publisher crashes after publishing to the ITopic but before calling markDelivered(), the next cycle picks up the same entry and delivers it again. Events are never lost as long as the embedded Hazelcast instance’s IMap data is intact.

    At-least-once means consumers may see duplicates. That’s where the Idempotency Guard from Part 9 comes in — it deduplicates on the consumer side, complementing the outbox’s guaranteed delivery.

    As for ordering: events for the same aggregate are written to the outbox in sequence order (the Jet pipeline processes them sequentially), and claimPending sorts by createdAt. But if two events are pending simultaneously and the first one fails while the second succeeds, they’ll arrive out of order. For our saga use case that’s acceptable — each step is identified by sagaId and eventType, and the saga state machine handles duplicates and out-of-order delivery.


    Configuration

    framework.outbox.*

    Property Default Description
    enabled true Master toggle for the outbox pattern
    poll-interval 1000 (ms) Fallback interval if signal is missed
    max-batch-size 50 Maximum entries per poll cycle
    max-retries 5 Delivery attempts before permanent failure
    entry-ttl 24h How long DELIVERED entries survive in the map

    Metrics

    Metric Type Description
    outbox.entries.written Counter Events written to the outbox
    outbox.entries.delivered Counter Events delivered to shared cluster
    outbox.entries.failed Counter Events permanently failed
    outbox.publish.duration Timer Time per publish cycle

    To disable the outbox and use direct publishing:

    framework:
      outbox:
        enabled: false
    

    What’s Next

    The outbox guarantees events reach the shared cluster. But what happens when they get there and the consumer can’t process them? The consumer might crash, the business logic might throw, the circuit breaker might be open.

    In Part 9, we add two patterns that work together: a Dead Letter Queue that captures events that fail consumer-side processing, and an Idempotency Guard that prevents duplicate processing — the natural flip side of at-least-once delivery.


    Next up: Dead Letter Queues and Idempotency

    Previous: Circuit Breakers and Retry: Resilient Hazelcast Sagas

    Code: github.com/myawnhc/hazelcast-microservices-framework — clone it, docker-compose up, and the framework boots locally with sample data.
  • Circuit Breakers and Retry: Resilient Hazelcast Sagas

    Circuit Breakers and Retry: Resilient Hazelcast Sagas

    Part 7 in the “Building Event-Driven Microservices with Hazelcast” series


    Introduction

    A commercial airliner doesn’t fall out of the sky when an engine fails. It keeps flying. The remaining engine provides enough thrust to reach the nearest airport, the crew follows a well-rehearsed procedure, and the passengers — ideally — never know how close things got. Aviation engineers figured this out decades ago: you can’t prevent every failure, so you build the system to keep working when parts of it stop. (There’s even a great acronym for it — ETOPS, which officially stands for Extended Twin-engine Operations Performance Standards, but which pilots will tell you really means “Engines Turn Or Passengers Swim.”)

    Microservices need the same philosophy. Not because individual services fail as dramatically as a jet engine, but because they fail far more often. A garbage collection pause. A network blip. A downstream provider having a bad day. A deployment rolling through the cluster at 2 AM. In a monolith, these are minor hiccups — the kind of thing you might not even notice in the logs. In a distributed system where five services coordinate through asynchronous events, a hiccup in one service can propagate to all five in the time it takes to brew a cup of coffee.

    And the ways things go wrong are… creative. The catalog of distributed system failure modes is large enough to fill a textbook. Several textbooks, actually — and people have. Too many for a single pattern or a single blog post.

    So we’re spending the next three posts on resilience. This one covers circuit breakers and retry — protecting saga listeners when downstream services misbehave. Part 8 tackles the transactional outbox pattern, which guarantees events aren’t lost between producer and consumer. And Part 9 adds dead letter queues and idempotency guards — the safety nets for events that fail permanently or arrive more than once. Three different failure modes, three different mechanisms.

    Back in Part 4, we built a choreographed saga for order fulfillment. Three services — Inventory, Payment, and Order — coordinate through Hazelcast ITopic events published on a shared cluster. The happy path works beautifully. Without resilience patterns, though, a single struggling service can drag the whole saga down with it. A slow Payment Service fills up the Inventory Service’s thread pool with blocked calls. A transient network error permanently loses an event. A burst of failures overwhelms everything simultaneously.

    That’s what we’re fixing.


    The Problem: Cascading Failures

    Here’s the order fulfillment saga on a good day:

    Order fulfillment saga happy path — Inventory, Payment, and Order services exchanging OrderCreated, StockReserved, PaymentProcessed, and OrderConfirmed events over Hazelcast ITopic

    Each step is an ITopic message on the shared Hazelcast cluster. Each listener calls a local service method — IMap operations, Jet pipeline processing, further ITopic publishing. Events flow, state updates, everyone’s happy.

    Now imagine the Payment Service is having a rough morning. Some downstream payment provider is dragging, and every StockReserved event that arrives takes 30 seconds to process instead of the normal 50 milliseconds. Without any resilience mechanism, here’s what unfolds:

    1. Inventory keeps publishing StockReserved events at the normal rate
    2. Payment’s listener thread pool fills up with slow calls
    3. New events queue behind the blocked threads
    4. ITopic backpressure eventually slows the shared cluster itself
    5. Other listeners on the same cluster — including Inventory and Order — start seeing delays
    6. The entire saga grinds to a halt

    One service had a problem. Now every service has a problem. This is a cascade failure, and it’s the defining hazard of distributed architectures. The shared communication fabric that makes coordination possible is the same fabric that propagates failure.


    Enter Resilience4j

    The patterns we need — circuit breakers, retry with backoff, bulkheads, rate limiters — have been well understood for years. Netflix popularized them in the Java world with Hystrix, which became the standard library for microservice resilience through most of the 2010s. But Netflix put Hystrix into maintenance mode in 2018 and eventually stopped development entirely.

    The successor that emerged is Resilience4j. It’s a lightweight fault tolerance library for Java 8+ built around functional composition — you wrap a Supplier or Runnable with decorators, and the decorators handle the resilience logic. It’s not just a circuit breaker library, though that’s what most people know it for. It actually provides six core modules: circuit breaker, retry, bulkhead (resource isolation), rate limiter, time limiter, and cache. Each is standalone. You pick what you need and leave the rest on the shelf.

    There are other options — Failsafe is a solid zero-dependency alternative, and Alibaba’s Sentinel targets high-traffic rate limiting scenarios. But Resilience4j has become the de facto choice for Spring Boot microservices. The Spring integration is mature, Micrometer metrics work out of the box, and @ConfigurationProperties binding means your resilience settings live in the same YAML as everything else. For our framework, we’re using two of the six modules: CircuitBreaker and Retry.


    Circuit Breakers: Automatic Service Isolation

    A circuit breaker does what it sounds like. It monitors the failure rate of an operation and automatically stops calling it when failures exceed a threshold — the same idea as the breaker panel in your house. Too much current flows through the circuit, the breaker trips, the wiring doesn’t catch fire. In our case, “too much current” means too many failed calls, and “the wiring” is every other service sharing that communication path.

    Three States

    Circuit breaker state machine — CLOSED trips to OPEN when the failure rate crosses the threshold, OPEN moves to HALF-OPEN after the wait duration, and HALF-OPEN returns to CLOSED on success or back to OPEN on failure

    CLOSED is normal operation. All calls pass through, and the circuit breaker quietly records outcomes in a sliding window. OPEN means the breaker has tripped — all calls are immediately rejected with a CallNotPermittedException, and no load reaches the downstream service at all. HALF-OPEN is the recovery probe: a limited number of test calls pass through. If they succeed, the breaker returns to CLOSED. If they fail, back to OPEN. Rinse and repeat until the downstream service gets its act together.

    The Framework’s ResilientServiceInvoker

    Rather than sprinkling Resilience4j decorators at every call site, we centralized everything into ResilientServiceInvoker:

    public class ResilientServiceInvoker implements ResilientOperations {
    
        private final CircuitBreakerRegistry circuitBreakerRegistry;
        private final RetryRegistry retryRegistry;
        private final ResilienceProperties properties;
    
        public <T> T execute(final String name, final Supplier<T> operation) {
            if (!properties.isEnabled()) {
                return operation.get();
            }
    
            final CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker(name);
            final Retry retry = retryRegistry.retry(name);
    
            final Supplier<T> decoratedSupplier = CircuitBreaker.decorateSupplier(circuitBreaker,
                    Retry.decorateSupplier(retry, operation));
    
            try {
                return decoratedSupplier.get();
            } catch (CallNotPermittedException e) {
                logger.warn("Circuit breaker '{}' is OPEN — rejecting call", name);
                throw new ResilienceException(
                        "Circuit breaker '" + name + "' is open, call rejected", name, e);
            } catch (Exception e) {
                logger.error("Operation '{}' failed after retries: {}", name, e.getMessage());
                throw new ResilienceException(
                        "Operation '" + name + "' failed after retries", name, e);
            }
        }
    }
    

    A few things to notice here. Each call to execute(“inventory-stock-reservation”, …) creates or retrieves a circuit breaker and retry instance with that name. This means each saga step gets its own independent circuit breaker — a payment failure won’t trip the inventory breaker.

    The decoration order matters: retry wraps the operation first, then the circuit breaker wraps the retry. So the circuit breaker sees the final outcome after all retries are exhausted. A transient failure that succeeds on the second attempt counts as a success for the circuit breaker. If you stacked them the other way around, every individual failed attempt would register as a circuit breaker failure, and you’d trip the breaker much faster than you intended.

    And there’s a kill switch. When framework.resilience.enabled=false, the execute method just calls the operation directly. Zero overhead. This matters for testing and for environments where resilience is handled at a different layer — a service mesh, maybe, or a cloud provider’s load balancer.

    The ResilientOperations Interface

    We extract an interface from the concrete class:

    public interface ResilientOperations {
        <T> T execute(String name, Supplier<T> operation);
        void executeRunnable(String name, Runnable operation);
        <T> CompletableFuture<T> executeAsync(String name, Supplier<CompletableFuture<T>> operation);
    }
    

    This is the same workaround we used for ServiceClientOperations in Part 6. Java 25’s Mockito inline mock maker can’t mock concrete classes in certain JVM configurations, so you extract an interface and mock that instead. Not the most glamorous reason to create an abstraction, but it works.

    Three Flavors

    The invoker supports three calling patterns:

    // Synchronous — returns a value
    String result = invoker.execute("orderSaga", () -> processEvent(event));
    
    // Fire-and-forget — void operation
    invoker.executeRunnable("paymentListener", () -> publishToTopic(event));
    
    // Async — returns CompletableFuture
    CompletableFuture<Product> future = invoker.executeAsync("inventory-stock-reservation",
            () -> inventoryService.reserveStockForSaga(productId, quantity, ...));
    

    The async variant is the one our saga listeners actually use — inventory, payment, and order service calls all return CompletableFuture.


    Wiring into the Saga Listeners

    The saga listeners from Part 4 now inject ResilientOperations as an optional dependency:

    @Component
    public class InventorySagaListener {
    
        private final ProductService inventoryService;
        private final HazelcastInstance hazelcast;
        private ResilientOperations resilientServiceInvoker;
    
        @Autowired(required = false)
        public void setResilientOperations(ResilientOperations resilientServiceInvoker) {
            this.resilientServiceInvoker = resilientServiceInvoker;
        }
    

    That @Autowired(required = false) is doing important work. If resilience is disabled — or if the Resilience4j dependency isn’t even on the classpath — the listener still functions. It just calls the service directly, no wrapping. The saga worked before we added resilience; it should keep working without it.

    Each listener has a helper that handles the null check:

    private <T> CompletableFuture<T> executeWithResilience(
            final String name, final Supplier<CompletableFuture<T>> operation) {
        if (resilientServiceInvoker != null) {
            return resilientServiceInvoker.executeAsync(name, operation);
        }
        return operation.get();
    }
    

    And the actual saga step looks like this:

    executeWithResilience("inventory-stock-reservation",
            () -> inventoryService.reserveStockForSaga(
                    productId, quantity, orderId, sagaId, correlationId,
                    customerId, total, currency, "CREDIT_CARD"
            )
    ).whenComplete((product, error) -> {
        if (error != null) {
            sendToDeadLetterQueue(record, "OrderCreated", error);
        } else {
            logger.info("Stock reserved for saga: productId={}, quantity={}, orderId={}, sagaId={}",
                    productId, quantity, orderId, sagaId);
        }
    });
    

    The circuit breaker name inventory-stock-reservation is specific to this saga step. Each step across the three services gets its own name and its own circuit breaker:

    Circuit Breaker Name Saga Step Service
    inventory-stock-reservation Reserve stock on OrderCreated Inventory
    inventory-stock-release Release stock on compensation Inventory
    payment-processing Process payment on StockReserved Payment
    payment-refund Refund payment on compensation Payment
    order-confirmation Confirm order on PaymentProcessed Order
    order-cancellation Cancel order on compensation Order

    Six independent circuit breakers. If payment processing is struggling, the inventory breakers stay closed and keep doing their job.


    Retry with Exponential Backoff

    Transient failures — network blips, temporary overload, brief GC pauses — are the most common failure mode in distributed systems. Most of them resolve on their own within seconds. Retry is the first line of defense.

    The Thundering Herd

    But naive retry — retry immediately, same interval, keep hammering — can make things actively worse. Picture this: a service buckles under load, and 100 clients all get errors simultaneously. They all retry at 500ms. The service sees a spike of 100 simultaneous requests. It fails again. They all retry at 1000ms. Another spike. Same result.

    This is the thundering herd problem. Everyone backs off at the same fixed interval, and everyone comes stampeding back at the same moment. The retry mechanism that was supposed to help is the thing keeping the service down.

    Exponential backoff breaks the herd apart:

    Attempt 1: immediate
    Attempt 2: wait 500ms
    Attempt 3: wait 1000ms  (500ms × 2.0)
    Attempt 4: wait 2000ms  (1000ms × 2.0)
    

    The growing intervals give the struggling service breathing room. And because different callers started their retry sequences at slightly different moments, the backoff naturally staggers the waves. Each one arrives smaller and more spread out than the last. The herd thins itself out.

    Configuration

    The framework exposes all of this through ResilienceProperties:

    framework:
      resilience:
        enabled: true
        retry:
          max-attempts: 3
          wait-duration: 500ms
          enable-exponential-backoff: true
          exponential-backoff-multiplier: 2.0
    

    The auto-configuration translates these into a Resilience4j RetryConfig:

    @Bean
    @ConditionalOnMissingBean
    public RetryRegistry retryRegistry(final ResilienceProperties properties) {
        final ResilienceProperties.RetryProperties retryProps = properties.getRetry();
    
        final RetryConfig.Builder<?> builder = RetryConfig.custom()
                .maxAttempts(retryProps.getMaxAttempts())
                .retryOnException(e -> !(e instanceof NonRetryableException));
    
        if (retryProps.isEnableExponentialBackoff()) {
            builder.intervalFunction(IntervalFunction
                    .ofExponentialBackoff(
                            retryProps.getWaitDuration(),
                            retryProps.getExponentialBackoffMultiplier()));
        } else {
            builder.waitDuration(retryProps.getWaitDuration());
        }
    
        return RetryRegistry.of(builder.build());
    }
    

    Two things to note. The retryOnException predicate excludes NonRetryableException — we’ll get to that in a moment. And when enable-exponential-backoff is false, it falls back to a fixed interval between attempts.


    NonRetryableException: When to Stop Trying

    Not every failure is transient. “Payment declined” will never succeed on retry — the credit card is invalid. “Insufficient stock” is deterministic — the warehouse genuinely doesn’t have the product. Retrying these wastes time, wastes resources, and — if the circuit breaker is counting — burns through your failure budget for no reason.

    The framework defines a marker interface:

    public interface NonRetryableException {
        // Marker interface — business exceptions implement this to skip retry
    }
    

    Service exceptions opt in:

    public class InsufficientStockException extends RuntimeException
            implements NonRetryableException {
        public InsufficientStockException(String message) {
            super(message);
        }
    }
    
    public class PaymentDeclinedException extends RuntimeException
            implements NonRetryableException {
        public PaymentDeclinedException(String message) {
            super(message);
        }
    }
    

    Why a marker interface instead of a base class? Because these exceptions already extend RuntimeException. Java doesn’t have multiple inheritance, but it does have multiple interfaces. The marker lets any exception opt out of retry without changing its class hierarchy.

    The retry configuration’s predicate is one line:

    .retryOnException(e -> !(e instanceof NonRetryableException))
    

    When retry encounters one of these, it fails immediately. No backoff, no additional attempts. But the circuit breaker still records it as a failure — it still counts toward the failure rate threshold. This is the right behavior. If a service is returning “payment declined” for every single request, something is systematically wrong, and the circuit breaker should trip.


    Retry Observability

    Resilience4j publishes events for every retry attempt, and the framework hooks into them for structured logging and a custom metric:

    public class RetryEventListener {
    
        public RetryEventListener(final RetryRegistry retryRegistry,
                                  final MeterRegistry meterRegistry) {
            this.meterRegistry = meterRegistry;
    
            retryRegistry.getAllRetries().forEach(this::registerListeners);
            retryRegistry.getEventPublisher().onEntryAdded(
                    event -> registerListeners(event.getAddedEntry()));
        }
    
        private void registerListeners(final Retry retry) {
            final var eventPublisher = retry.getEventPublisher();
            eventPublisher.onRetry(this::onRetry);
            eventPublisher.onSuccess(this::onSuccess);
            eventPublisher.onError(this::onError);
            eventPublisher.onIgnoredError(this::onIgnoredError);
        }
    }
    

    Four event types give you the full picture:

    Event Log Level What happened
    onRetry WARN An attempt failed, trying again
    onSuccess INFO Eventually succeeded
    onError ERROR All retries exhausted
    onIgnoredError INFO Non-retryable, skipped retry

    That last one — onIgnoredError — needed a custom Micrometer counter because Resilience4j’s built-in TaggedRetryMetrics doesn’t track ignored errors:

    private void onIgnoredError(final RetryOnIgnoredErrorEvent event) {
        logger.info("Non-retryable exception for '{}', skipping retry: {}",
                event.getName(), event.getLastThrowable().getMessage());
    
        Counter.builder("framework.resilience.retry.ignored")
                .description("Count of non-retryable exceptions that skipped retry")
                .tag("name", event.getName())
                .register(meterRegistry)
                .increment();
    }
    

    In practice, the logs tell you a clear story. A transient failure that recovers:

    WARN  RetryEventListener - Retry attempt #1 for 'payment-processing': Connection refused
    WARN  RetryEventListener - Retry attempt #2 for 'payment-processing': Connection refused
    INFO  RetryEventListener - 'payment-processing' succeeded after 2 attempt(s)
    

    A business exception that gets kicked straight to the dead letter queue:

    INFO  RetryEventListener - Non-retryable exception for 'payment-processing',
          skipping retry: Insufficient funds for amount 15000.00
    

    The ResilienceException Wrapper

    When an operation exhausts all retries or gets rejected by an open circuit breaker, the framework wraps the failure in a ResilienceException:

    public class ResilienceException extends RuntimeException {
    
        private final String operationName;
    
        public ResilienceException(String message, String operationName, Throwable cause) {
            super(message, cause);
            this.operationName = operationName;
        }
    }
    

    The operationName field tells downstream handlers which circuit breaker failed. The dead letter queue integration (Part 9) uses this to classify failures:

    if (error instanceof ResilienceException) {
        logger.warn("Circuit breaker open, saga step deferred: eventId={}", eventId);
    } else {
        logger.error("Failed to process event: {}", eventId, error);
    }
    

    Auto-Configuration

    The whole resilience stack is wired through a single auto-configuration class:

    @Configuration
    @ConditionalOnClass(CircuitBreakerRegistry.class)
    @ConditionalOnProperty(name = "framework.resilience.enabled", matchIfMissing = true)
    @EnableConfigurationProperties(ResilienceProperties.class)
    public class ResilienceAutoConfiguration {
    
        @Bean @ConditionalOnMissingBean
        public CircuitBreakerRegistry circuitBreakerRegistry(ResilienceProperties properties) { ... }
    
        @Bean @ConditionalOnMissingBean
        public RetryRegistry retryRegistry(ResilienceProperties properties) { ... }
    
        @Bean @ConditionalOnMissingBean
        public ResilientServiceInvoker resilientServiceInvoker(...) { ... }
    
        @Bean @ConditionalOnMissingBean(TaggedCircuitBreakerMetrics.class)
        public TaggedCircuitBreakerMetrics taggedCircuitBreakerMetrics(...) { ... }
    
        @Bean @ConditionalOnMissingBean(TaggedRetryMetrics.class)
        public TaggedRetryMetrics taggedRetryMetrics(...) { ... }
    
        @Bean @ConditionalOnMissingBean
        public RetryEventListener retryEventListener(...) { ... }
    }
    

    Three conditionals control activation. @ConditionalOnClass(CircuitBreakerRegistry.class) means the whole thing only activates when Resilience4j is on the classpath — services that don’t include the dependency don’t get any resilience beans. @ConditionalOnProperty(…, matchIfMissing = true) means it’s enabled by default; set framework.resilience.enabled=false to turn it off. And every individual bean is @ConditionalOnMissingBean, so the application can override any piece by defining its own bean.

    Six beans total:

    1. CircuitBreakerRegistry — circuit breaker instances, configured from properties
    2. RetryRegistry — retry instances with optional exponential backoff
    3. ResilientServiceInvoker — the decorator that wraps operations
    4. TaggedCircuitBreakerMetrics — binds circuit breaker metrics to Micrometer
    5. TaggedRetryMetrics — binds retry metrics to Micrometer
    6. RetryEventListener — structured logging and the custom ignored-error counter

    Per-Instance Tuning

    Different saga steps have different tolerance for failure. Stock reservation should be fast and reliable — if it’s failing, something is seriously wrong, and we want the circuit to trip quickly. Payment processing, on the other hand… payment providers are notoriously flaky. You’d rather tolerate a higher failure rate and give the provider more time to sort itself out before you start rejecting everything.

    The framework supports per-instance overrides in each service’s application.yml:

    framework:
      resilience:
        enabled: true
        circuit-breaker:
          failure-rate-threshold: 50
          wait-duration-in-open-state: 10s
          sliding-window-size: 10
          minimum-number-of-calls: 5
          permitted-number-of-calls-in-half-open-state: 3
        retry:
          max-attempts: 3
          wait-duration: 500ms
          enable-exponential-backoff: true
          exponential-backoff-multiplier: 2.0
        instances:
          inventory-stock-reservation:
            circuit-breaker:
              failure-rate-threshold: 40
              wait-duration-in-open-state: 5s
            retry:
              max-attempts: 2
          payment-processing:
            circuit-breaker:
              failure-rate-threshold: 60
              wait-duration-in-open-state: 15s
            retry:
              max-attempts: 5
              wait-duration: 1s
    

    The instances map lets any named circuit breaker override the defaults:

    public CircuitBreakerProperties getCircuitBreakerForInstance(final String name) {
        final InstanceProperties instance = instances.get(name);
        if (instance != null && instance.getCircuitBreaker() != null) {
            return instance.getCircuitBreaker();
        }
        return circuitBreaker; // Fall back to defaults
    }
    

    So in this configuration, inventory-stock-reservation trips at 40% failure rate with a 5-second open state and only 2 retry attempts — stock checks are idempotent and fast, no point dragging things out. payment-processing tolerates 60% failure rate with a 15-second open state and 5 retries starting at 1-second intervals. With exponential backoff, that last attempt waits about 16 seconds. Payment providers get the patience they’ve trained us to give them.


    Metrics and Monitoring

    The auto-configuration binds circuit breaker and retry metrics to Micrometer, which exports to Prometheus for Grafana dashboards:

    Circuit Breaker Metrics

    Metric Type Description
    resilience4j_circuitbreaker_state Gauge Current state (0=CLOSED, 1=OPEN, 2=HALF_OPEN)
    resilience4j_circuitbreaker_calls_total Counter Total calls by outcome (successful, failed, not_permitted)
    resilience4j_circuitbreaker_failure_rate Gauge Current failure rate percentage
    resilience4j_circuitbreaker_buffered_calls Gauge Calls in sliding window

    Retry Metrics

    Metric Type Description
    resilience4j_retry_calls_total Counter Total calls by outcome (successful_without_retry, successful_with_retry, failed_with_retry, failed_without_retry)
    framework.resilience.retry.ignored Counter Non-retryable exceptions (tagged by name)

    These feed into Grafana panels for saga health — circuit breaker state timeline showing when breakers trip and recover, retry rate over time where a spike tells you something transient is happening, failure rate broken out by saga step so you can see which one is misbehaving, and the non-retryable exception count that separates business logic failures from infrastructure problems.


    Configuration Reference

    framework.resilience.*

    Property Default Description
    enabled true Master toggle for all resilience features
    circuit-breaker.failure-rate-threshold 50 Failure rate (%) to trip the breaker
    circuit-breaker.wait-duration-in-open-state 10s How long to stay open before testing
    circuit-breaker.sliding-window-size 10 Number of calls in the measurement window
    circuit-breaker.sliding-window-type COUNT_BASED COUNT_BASED or TIME_BASED
    circuit-breaker.minimum-number-of-calls 5 Minimum calls before evaluating failure rate
    circuit-breaker.permitted-number-of-calls-in-half-open-state 3 Test calls in half-open state
    retry.max-attempts 3 Maximum retry attempts (including initial)
    retry.wait-duration 500ms Base wait between retries
    retry.enable-exponential-backoff true Use exponential backoff
    retry.exponential-backoff-multiplier 2.0 Backoff multiplier
    instances.<name>.circuit-breaker.* (defaults) Per-instance circuit breaker overrides
    instances.<name>.retry.* (defaults) Per-instance retry overrides

    What’s Next

    Circuit breakers and retry handle one category of failure: transient problems during event consumption. The saga listener tries, the call fails, the retry policy kicks in, the circuit breaker keeps the damage from spreading. That covers the consumer side.

    But what about the producer side? When EventSourcingController needs to republish an event to the shared cluster and the cluster is temporarily unreachable, the event just… vanishes. No retry. No circuit breaker. Gone.

    That’s a different failure mode, and it needs a different mechanism. In Part 8, we add the transactional outbox pattern — a durable buffer between event production and cross-cluster delivery that guarantees no events are lost, even when the shared cluster is down. Then Part 9 closes the loop with dead letter queues and idempotency guards for events that exhaust all retries or arrive more than once.


    Next up: The Transactional Outbox Pattern with Hazelcast

    Previous: MCP Server for Microservices: AI-Powered Debugging

    Code: github.com/myawnhc/hazelcast-microservices-framework — clone it, docker-compose up, and the framework boots locally with sample data.
  • MCP Server for Microservices: AI-Powered Debugging

    MCP Server for Microservices: AI-Powered Debugging

    Part 6 in the “Building Event-Driven Microservices with Hazelcast” series


    Introduction

    Over the first five articles, we built an event sourcing framework, a Jet pipeline, materialized views, a choreographed saga pattern, and vector similarity search. That’s a lot of infrastructure. It also means that investigating a problem — say, a failed saga — involves chaining together five or six curl commands across four different services, reading JSON output with your eyes, extracting IDs by hand, and constructing the next request.

    Which is fine. It’s what we’ve always done. But there’s a better option now.

    The Model Context Protocol (MCP) is an open standard that lets AI assistants — Claude, ChatGPT, Copilot, whoever — call tools exposed by external servers. Instead of the assistant guessing at curl commands or asking you to copy-paste output, it directly queries your materialized views, submits events, inspects saga state, and runs demo scenarios.

    In this article, we build an MCP server that bridges AI assistants to our eCommerce microservices. And yes, there is something a little meta about using Claude to build a framework and then building a bridge so Claude can operate the framework. We’re going with it.


    Why Give an AI Access to Your Microservices?

    Consider a typical debugging session. A saga has failed, and you want to know why:

    # Step 1: Find failed sagas
    curl http://localhost:8083/api/sagas?status=FAILED
    
    # Step 2: Copy a saga ID from the JSON output
    curl http://localhost:8083/api/sagas/saga-a7f3e2
    
    # Step 3: Check the order that triggered it
    curl http://localhost:8083/api/orders/ord-12345
    
    # Step 4: Check the event history
    curl http://localhost:8083/api/orders/ord-12345/events
    
    # Step 5: Check if stock was released as part of compensation
    curl http://localhost:8082/api/products/prod-67890
    

    Five commands. Each one requires reading JSON output, finding the right ID, and constructing the next request. You’re doing the orchestration in your head, and — let’s be honest — that’s exactly the kind of tedious mechanical chaining that humans are bad at and computers are good at.

    With MCP, the same investigation is a single sentence:

    “Why did the most recent saga fail?”

    The AI calls list_sagas(status=”FAILED”), then inspect_saga(sagaId=”saga-a7f3e2″), then get_event_history(aggregateId=”ord-12345″, aggregateType=”Order”), interprets all the responses, and gives you a summary:

    “Saga saga-a7f3e2 failed at the payment step. Order ORD-12345 had a total of $15,000 which exceeded the $10,000 payment limit. Compensation ran successfully — stock for product PROD-67890 was released.”

    Five tool calls, zero curl commands, a root-cause analysis, and a recommendation. From one question.


    What Is MCP?

    MCP (Model Context Protocol) is an open specification by Anthropic that defines a standard interface between AI assistants and external tools. Think of it as a contract:

    MCP protocol sequence: the AI assistant sends tools/list and tools/call to the MCP server, which returns tool definitions and JSON results over JSON-RPC

    The protocol uses JSON-RPC 2.0 over one of two transports:

    Transport How It Works Best For
    stdio AI assistant launches the server as a subprocess; communicates via stdin/stdout Local development with Claude Code or Claude Desktop
    SSE (HTTP) Server runs as a web service; AI connects over HTTP with Server-Sent Events Docker, remote deployment, multi-user

    The AI assistant doesn’t need to know anything about Hazelcast, Jet pipelines, or event sourcing. It sees ten tools with descriptions and parameters. The MCP server handles the translation between “query the customer view” and “GET http://account-service:8081/api/customers.&#8221;


    Designing Tools Around Event Sourcing

    The hardest part of building an MCP server isn’t the protocol — it’s deciding what tools to expose. Too many and the AI gets confused about which one to use. Too few and it can’t do useful work. We went back and forth on this and started with seven, organized around the three concerns of an event-sourced system. Three more got added later for dead letter queue recovery, which we’ll get to in a moment.

    Queries (Read Current State)

    Tool What It Does
    query_view Read materialized views — current state of customers, products, orders, payments
    get_event_history Read the event log — how an entity reached its current state

    These map to the read side of CQRS. Views give you the “what,” event history gives you the “why.”

    Commands (Produce New Events)

    Tool What It Does
    submit_event Create customers, products, orders; cancel orders; process payments; refund payments
    run_demo Execute multi-step scenarios (happy path, payment failure, saga timeout, sample data)

    Each command produces domain events that flow through the Jet pipeline. run_demo chains multiple commands together to set up investigation targets — a failed payment saga, a timeout scenario, a happy path to compare against.

    Observability (Inspect the System)

    Tool What It Does
    inspect_saga View a saga’s status, steps completed, timing, and failure reason
    list_sagas Browse sagas filtered by status
    get_metrics Aggregated system metrics — saga counts, event throughput, active gauges

    Dead Letter Queue (Investigate and Replay Failures)

    Tool What It Does
    list_dlq_entries List failed events that landed in the dead letter queue, with a pending-count summary for quick triage
    inspect_dlq_entry View a single DLQ entry: event data, failure reason, saga context, replay count
    replay_dlq_entry Republish a DLQ entry’s event for reprocessing — after the cause is fixed

    We hadn’t built the DLQ machinery yet when the MCP server first shipped, so these three were added later. The investigation workflow — list, inspect, then decide to replay or not — turned out to map cleanly onto how a human operator works through a queue of failed events. Asking the AI to walk that with you, one entry at a time, is dramatically less tedious than the curl version.

    Ten tools, four categories, no overlap. The AI handles any reasonable question about the system, and tool selection stays reliable — you’d never call get_metrics when you meant query_view, or list_dlq_entries when you meant list_sagas. The shape of the tool decides which question it answers.


    Architecture: A Pure REST Proxy

    The MCP server sits between the AI assistant and the microservices:

    MCP server architecture: an AI assistant connects via the MCP protocol to a Spring Boot MCP server on port 8085, which proxies REST calls to the Account, Inventory, Order, and Payment services

    We made a deliberate choice here: the MCP server has no Hazelcast dependency. It doesn’t join any cluster, doesn’t read IMaps, doesn’t run Jet jobs. It’s a thin REST proxy that translates MCP tool calls into HTTP requests against the existing service APIs.

    Why go to the trouble of keeping them separate? Because coupling the MCP server to Hazelcast would mean classpath conflicts with the services, a dependency on the data layer that makes testing painful, and another component that needs Hazelcast configuration. As a pure proxy, the server needs maybe 128-256 MB of heap, has no classpath conflicts, and you can test every tool by mocking REST responses without running a single service.


    Implementation

    The ServiceClient

    All HTTP communication goes through one class:

    @Component
    public class ServiceClient implements ServiceClientOperations {
    
        private final McpServerProperties properties;
        private final RestClient restClient;
    
        public Map<String, Object> getEntity(String viewName, String id) {
            String url = resolveUrl(viewName) + "/" + id;
            String json = restClient.get().uri(url).retrieve().body(String.class);
            return parseMap(json);
        }
    
        String resolveUrl(String viewName) {
            return switch (viewName.toLowerCase()) {
                case "customer" -> properties.getAccountUrl() + "/api/customers";
                case "product"  -> properties.getInventoryUrl() + "/api/products";
                case "order"    -> properties.getOrderUrl() + "/api/orders";
                case "payment"  -> properties.getPaymentUrl() + "/api/payments";
                default -> throw new IllegalArgumentException("Unknown view: " + viewName);
            };
        }
    }
    

    That resolveUrl switch is the only place that knows which service owns which view. Every tool delegates to ServiceClient rather than making HTTP calls directly.

    The ServiceClientOperations interface exists because Mockito’s inline mock maker on Java 25 cannot mock concrete classes. We hit this wall across the framework — the solution every time was to extract an interface so tests can mock it. It’s a slightly annoying pattern, but it works.

    A Tool Implementation

    Each tool is a Spring @Service with a @Tool-annotated method. Here’s QueryViewTool:

    @Service
    public class QueryViewTool {
    
        private final ServiceClientOperations serviceClient;
    
        @Tool(description = "Query a materialized view. "
                + "Available views: customer, product, order, payment. "
                + "Provide a key to get a specific entity, or omit to list entities.")
        public String queryView(
                @ToolParam(description = "View to query: customer, product, order, or payment")
                String viewName,
                @ToolParam(description = "Optional: specific entity ID", required = false)
                String key,
                @ToolParam(description = "Max results when listing (default: 10)", required = false)
                Integer limit) {
    
            if (key != null && !key.isBlank()) {
                return toJson(serviceClient.getEntity(viewName, key));
            } else {
                int effectiveLimit = (limit != null && limit > 0) ? limit : 10;
                List<Map<String, Object>> results = serviceClient.listEntities(viewName, effectiveLimit);
                return toJson(Map.of(
                        "view", viewName,
                        "count", results.size(),
                        "entities", results
                ));
            }
        }
    }
    

    That @Tool description is doing real work. The AI reads it to decide which tool to call and what parameters to provide. If you’re vague — “query data” instead of “Query a materialized view. Available views: customer, product, order, payment” — the AI picks the wrong tool or provides wrong parameters. We learned this the hard way. Be specific. Name the available views. Explain what happens with versus without a key.

    The optional parameters with defaults matter too. When the AI omits key, the tool lists entities. When it omits limit, you get 10. This lets a single tool handle “show me all customers” and “look up customer cust-123” without the AI needing to figure out everything every time.

    Tool Registration

    All ten tools get registered in one place:

    @Configuration
    public class McpToolConfig {
    
        @Bean
        public ToolCallbackProvider mcpTools(QueryViewTool queryView,
                                             SubmitEventTool submitEvent,
                                             GetEventHistoryTool getEventHistory,
                                             InspectSagaTool inspectSaga,
                                             ListSagasTool listSagas,
                                             GetMetricsTool getMetrics,
                                             RunDemoTool runDemo,
                                             ListDlqEntriesTool listDlqEntries,
                                             InspectDlqEntryTool inspectDlqEntry,
                                             ReplayDlqEntryTool replayDlqEntry) {
            return MethodToolCallbackProvider.builder()
                    .toolObjects(queryView, submitEvent, getEventHistory,
                            inspectSaga, listSagas, getMetrics, runDemo,
                            listDlqEntries, inspectDlqEntry, replayDlqEntry)
                    .build();
        }
    }
    

    Spring AI’s MethodToolCallbackProvider scans each object for @Tool methods and registers them with the MCP server. When the AI calls tools/list, it gets back all ten tool definitions with their descriptions and parameter schemas.


    The Event Dispatch Pattern

    SubmitEventTool deserves a closer look because it maps a single tool to seven different service endpoints:

    Map<String, Object> dispatch(String eventType, Map<String, Object> payload) {
        return switch (eventType) {
            case "CreateCustomer"  -> serviceClient.createEntity("customer", payload);
            case "CreateProduct"   -> serviceClient.createEntity("product", payload);
            case "CreateOrder"     -> serviceClient.createEntity("order", payload);
            case "CancelOrder"     -> {
                String orderId = requireField(payload, "orderId");
                yield serviceClient.performAction("order", orderId, "cancel", payload, true);
            }
            case "ReserveStock"    -> {
                String productId = requireField(payload, "productId");
                yield serviceClient.performAction("product", productId, "stock/reserve", payload, false);
            }
            case "ProcessPayment"  -> serviceClient.createEntity("payment", payload);
            case "RefundPayment"   -> {
                String paymentId = requireField(payload, "paymentId");
                yield serviceClient.performAction("payment", paymentId, "refund", payload, false);
            }
            default -> throw new IllegalArgumentException("Unknown event type: " + eventType);
        };
    }
    

    The alternative would be seven separate tools — create_customer, create_product, and so on. We went with a single submit_event tool with an eventType discriminator because it mirrors the event sourcing model (the system is event-driven, the tool should feel event-driven), it keeps the total tool count at ten instead of sixteen, and the AI handles the dispatch naturally. When you say “create a customer named Alice,” it maps that to eventType=”CreateCustomer” without difficulty.


    The Demo Tool

    RunDemoTool is the most complex tool because each scenario chains multiple service calls:

    private Map<String, Object> runHappyPath() {
        // Step 1: Create customer
        Map<String, Object> customer = serviceClient.createEntity("customer", Map.of(
                "name", "Demo Customer",
                "email", "demo-" + shortId() + "@example.com",
                "address", "123 Demo Street"
        ));
    
        // Step 2: Create product
        Map<String, Object> product = serviceClient.createEntity("product", Map.of(
                "sku", "DEMO-" + shortId(),
                "name", "Demo Widget",
                "price", "29.99",
                "quantityOnHand", 100
        ));
    
        // Step 3: Create order (uses IDs from previous steps)
        String customerId = extractId(customer, "customerId");
        String productId = extractId(product, "productId");
        Map<String, Object> order = serviceClient.createEntity("order", Map.of(
                "customerId", customerId,
                "customerName", "Demo Customer",
                "lineItems", List.of(Map.of(
                        "productId", productId,
                        "productName", "Demo Widget",
                        "quantity", 2,
                        "unitPrice", 29.99
                ))
        ));
    
        return Map.of("scenario", "happy_path", "steps", List.of(...));
    }
    

    Each scenario uses shortId() — a UUID fragment — so you can run the same scenario multiple times without naming collisions. The payment_failure scenario creates a $16,500 order that exceeds the $10,000 payment limit, triggering saga compensation. The saga_timeout scenario creates an order with minimal stock, designed to hit the deadline. These are pre-built investigation targets — the AI equivalent of a test fixture.


    Stdio vs. SSE: Two Transport Modes

    Default: stdio (Local Development)

    # application.properties
    spring.main.web-application-type=none
    spring.ai.mcp.server.name=ecommerce-mcp-server

    The AI assistant launches the server as a subprocess and communicates via stdin/stdout using JSON-RPC:

    stdio transport: Claude Code spawns the MCP server as a java -jar subprocess and communicates over stdin and stdout using JSON-RPC 2.0

    No network port needed. This is the default for local development with Claude Code or Claude Desktop.

    Docker: SSE/HTTP (Networked Deployment)

    # application-docker.properties
    spring.main.web-application-type=servlet
    spring.ai.mcp.server.stdio=false
    server.port=8085

    In Docker, the MCP server runs as a web service with Server-Sent Events on port 8085:

    mcp-server:
      build: ../mcp-server
      ports:
        - "8085:8085"
      environment:
        - SPRING_PROFILES_ACTIVE=docker
        - MCP_SERVICES_ACCOUNT_URL=http://account-service:8081
        - MCP_SERVICES_INVENTORY_URL=http://inventory-service:8082
        - MCP_SERVICES_ORDER_URL=http://order-service:8083
        - MCP_SERVICES_PAYMENT_URL=http://payment-service:8084

    The profile switch is the only difference between the two modes. Same tool code, same behavior.


    Testing

    Each tool has unit tests that mock ServiceClientOperations:

    @ExtendWith(MockitoExtension.class)
    class QueryViewToolTest {
    
        @Mock
        private ServiceClientOperations serviceClient;
    
        private QueryViewTool queryViewTool;
    
        @BeforeEach
        void setUp() {
            queryViewTool = new QueryViewTool(serviceClient);
        }
    
        @Test
        void shouldQueryByKey() throws JsonProcessingException {
            when(serviceClient.getEntity("customer", "c1"))
                    .thenReturn(Map.of("customerId", "c1", "name", "Alice"));
    
            String result = queryViewTool.queryView("customer", "c1", null);
    
            verify(serviceClient).getEntity("customer", "c1");
            Map<String, Object> parsed = objectMapper.readValue(result, new TypeReference<>() {});
            assertNotNull(parsed.get("customerId"));
        }
    }
    

    Eleven test classes cover all ten tools plus the ServiceClient. Add another six for the security layer (more on that below) and one integration suite, and the mcp-server module sits at 143 tests total.

    Integration tests use Spring’s ApplicationContextRunner to verify bean wiring without starting the MCP stdio transport (which would block in a test environment):

    @DisplayName("MCP Tool Integration")
    class McpToolIntegrationTest {
    
        private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
                .withConfiguration(AutoConfigurations.of(McpToolConfig.class))
                .withUserConfiguration(TestServiceClientConfig.class)
                .withBean(McpServerProperties.class);
    
        @Test
        void shouldCreateAllToolBeans() {
            contextRunner.run(context -> {
                assertThat(context).hasSingleBean(QueryViewTool.class);
                assertThat(context).hasSingleBean(SubmitEventTool.class);
                // ... all 10 tools
            });
        }
    
        @Test
        void shouldRegisterToolCallbackProvider() {
            contextRunner.run(context -> {
                ToolCallbackProvider provider = context.getBean(ToolCallbackProvider.class);
                assertThat(provider.getToolCallbacks()).hasSize(10);
            });
        }
    }
    

    Configuration

    The MCP server has exactly four configuration properties:

    mcp.services.account-url=http://localhost:8081
    mcp.services.inventory-url=http://localhost:8082
    mcp.services.order-url=http://localhost:8083
    mcp.services.payment-url=http://localhost:8084

    In Docker, these are overridden by environment variables pointing to container hostnames. That’s it. No Hazelcast configuration, no cluster membership, no pipeline setup.


    Setting Up the AI Assistant

    Claude Code

    Add to ~/.claude/claude_code_config.json:

    {
      "mcpServers": {
        "hazelcast-ecommerce": {
          "command": "java",
          "args": ["-jar", "mcp-server/target/mcp-server-1.0.0-SNAPSHOT.jar"]
        }
      }
    }
    

    Claude Desktop

    Add to claude_desktop_config.json:

    {
      "mcpServers": {
        "hazelcast-ecommerce": {
          "command": "java",
          "args": ["-jar", "/absolute/path/to/mcp-server-1.0.0-SNAPSHOT.jar"]
        }
      }
    }
    

    Docker (SSE)

    Point the AI assistant to the SSE endpoint: http://localhost:8085/mcp

    In all three cases, the AI discovers the ten tools via tools/list and starts using them immediately.


    A Real Conversation

    Here’s what a debugging session looks like with MCP connected:

    You: “Something’s wrong with recent orders. Can you check?”

    Assistant: calls list_sagas(status=”FAILED”)

    calls get_metrics()

    “I found 2 failed sagas out of 142 total. Your overall success rate is 94.4%. Let me investigate the failures.”

    calls inspect_saga(sagaId=”saga-fail-1″)

    calls inspect_saga(sagaId=”saga-fail-2″)

    “Both failures are payment rejections:

    1. saga-fail-1 (Order ORD-088): Total $15,000 exceeded the $10,000 payment limit. Stock was released via compensation.
    2. saga-fail-2 (Order ORD-091): Total $16,500, same payment limit issue. Stock also released.

    These aren’t bugs — the payment service is correctly rejecting orders above the limit. You might want to either raise the limit or add client-side validation to prevent orders over $10,000 from being submitted.”

    Five tool calls, zero curl commands, and a root-cause analysis with a recommendation. From one question. I’ll be honest — the first time I watched the AI chain together the right sequence of calls and arrive at a correct diagnosis, it felt a little eerie. Like watching someone drive your car better than you do.


    Authentication and Tool Authorization

    The first version of this server had no authentication, which is fine for local development and obviously not fine for anything else. So we’ve added API key authentication and role-based tool access — disabled by default to preserve backward compatibility, and enabled with a single property when you need it.

    mcp:
      security:
        enabled: true
        api-keys:
          viewer-key-12345: VIEWER
          operator-key-67890: OPERATOR
          admin-key-99999: ADMIN

    In HTTP/SSE mode the key arrives in the X-API-Key request header. In stdio mode it’s read from the MCP_API_KEY environment variable. Either way, the server resolves the key to a role, and a ToolAuthorizer checks whether the role is permitted to invoke the tool the AI just asked for.

    Three roles are defined:

    • VIEWER — Read-only. Can call query_view, get_event_history, inspect_saga, list_sagas, get_metrics, list_dlq_entries, and inspect_dlq_entry. Cannot modify state.
    • OPERATOR — Read plus write. Adds submit_event, run_demo, and replay_dlq_entry.
    • ADMIN — Same as OPERATOR today, reserved for future admin-only tools.

    run_demo is a good example of why the role split matters — it’s the kind of tool you absolutely do not want firing in production, and the default VIEWER key keeps that off the table. The viewer can do everything an SRE wants to do during an incident — query, inspect, look at metrics — but it can’t accidentally place an order.

    One layer is still missing: the MCP server authenticates its callers, but it doesn’t forward caller identity to the downstream microservices. For a real production deployment you’d want both. We’ll come back to that.


    Where This Goes Next

    A few directions we haven’t explored yet.

    MCP supports streaming responses, which we’d want for large result sets — listing thousands of events as a single JSON blob isn’t great. MCP also has resources, read-only data endpoints that the AI can reference as context without explicitly calling a tool. The materialized views are a natural fit for that.

    OAuth forwarding is the gap mentioned above — the MCP server’s caller identity needs to propagate down to the backend services if we want end-to-end auth in production. The plumbing exists in Spring Security; we just haven’t wired it up.

    And with the MCP server as a foundation, you could build specialized AI agents — an operations agent that monitors sagas and flags anomalies, a demo agent that walks users through the system, a testing agent that creates targeted test data and verifies compensation paths. We haven’t built any of these yet, but the tool layer is there.


    The MCP server adds a natural-language interface to everything we’ve built so far. Ten tools, a thin REST proxy, two transport modes, role-based authorization, 143 tests. It doesn’t add new capabilities to the data layer — it makes the existing capabilities accessible through conversation. And that turns out to matter more than it sounds like it should. The investigation that took five curl commands now takes one sentence. The demo that required a script and documentation now requires “show me the happy path.” The system that was only inspectable by people who knew the API endpoints is now inspectable by anyone who can ask a question.

    That’s where we’ll leave things for today.


    Next up: Circuit Breakers and Retry for Saga Resilience

    Previous: Vector Similarity Search with Hazelcast

    Code: github.com/myawnhc/hazelcast-microservices-framework — clone it, docker-compose up, and the framework boots locally with sample data.
  • On the Vector Store I Didn’t Ask For

    A short interstitial in the “Building Event-Driven Microservices with Hazelcast” series


    AI has been instrumental in bringing this project to fruition — I’m not making any secret of that. The first three posts in this series describe work that was largely pre-existing demo code: domain objects, the Jet pipeline, the materialized view machinery. Claude polished what was already there and helped me write about it. Honest work, but mostly cleanup.

    The saga post (post 4) marked a shift — that’s where the demo’s functionality moved into genuinely new territory. And because Hazelcast had recently added a VectorCollection data structure and vector search capability — still in beta at the time — I was eager to incorporate it. So I asked Claude to design and implement something. I should have kept a close eye at every stage; instead I took more of an “I’ll review everything when you’re done” approach.

    I was in for a surprise.

    What came back was a working vector search implementation. What did not come back was anything built on Hazelcast’s VectorCollection. Claude had built one from scratch — an IMap<String, float[]> for the embeddings, brute-force cosine similarity at query time. No HNSW indexing, no clever data structure, just compute the distance to every vector and sort the results. It worked. The “similar products” endpoint returned plausibly similar products.

    This is exactly the thing creating so much fear and doomsaying around AI in the industry. If a coding assistant can reproduce the functionality of an Enterprise software feature — Enterprise edition, additional license cost — in a few hours, is all enterprise software an endangered species?

    Not quite. Brute-force cosine similarity is O(n) per query — fine for a demo catalog, fine for a small product line, but not the same animal as Hazelcast’s Enterprise VectorCollection, which uses HNSW indexing to stay sub-millisecond at millions of vectors. That’s real engineering, and it took the Hazelcast team a lot longer than a few hours.

    What’s more interesting is that I ended up with both. The accidental implementation became the Community Edition fallback in the framework. The Enterprise implementation took over once I corrected course and built what I’d originally asked for. So the framework now has a VectorStoreService interface with two backends — Enterprise gets HNSW, Community gets brute force, and both work. The Community story is no longer “vector search doesn’t work without a license”; it’s “vector search works fine for modest workloads without a license, and scales seriously if you upgrade.”

    I’m not sure I’d have ended up there if Claude had built what I asked for the first time.

    Code: github.com/myawnhc/hazelcast-microservices-framework — clone it, docker-compose up, and the framework boots locally with sample data.
  • Vector Similarity Search on Hazelcast with HNSW

    Vector Similarity Search on Hazelcast with HNSW

    Part 5 in the “Building Event-Driven Microservices with Hazelcast” series


    So far we’ve built an event sourcing framework, a Jet pipeline, materialized views, and a saga pattern for distributed transactions. All of that gives us a solid eCommerce backend where orders flow through services, stock gets reserved, payments get processed, and everything recovers gracefully when something goes wrong.

    Now we’re going to add something different: “Find me products similar to this one.”

    You’ve seen this everywhere. Netflix’s “Because you watched…” Spotify’s discovery playlists. Amazon’s “Customers who bought this also bought…” These features run on vector embeddings — numerical representations of items positioned in high-dimensional space so that similar items cluster together. It sounds exotic, but by the end of this post we’ll have it working in our framework with a real embedding model running locally, no API keys required.

    Why Not Just Use Full-Text Search?

    If you’ve been doing this for a while, your reflex for “find similar products” is probably full-text search. Elasticsearch, Solr, maybe Postgres full-text indexes. And those are genuinely good tools for what they do — if someone types “gaming laptop,” full-text search finds documents containing the words “gaming” and “laptop.”

    But try searching for “portable computer for games.” Or “high-performance notebook for esports.” Semantically identical. Zero shared keywords. Full-text search won’t connect them because it’s matching tokens, not meaning.

    Embedding-based similarity works at a different level entirely. A trained model — we’re using all-MiniLM-L6-v2 — has learned from millions of text pairs that “gaming laptop” and “portable computer for games” mean the same thing. It places them near each other in vector space regardless of whether the words overlap. The model doesn’t care about your vocabulary. It cares about your intent.

    In production you’d probably combine both approaches: full-text for keyword lookups and structured queries, vector similarity for the “more like this” discovery path. But for recommendations and product discovery, embeddings are the right tool.


    What Are Vector Embeddings?

    A vector embedding is a fixed-size array of floats that captures an item’s semantic characteristics. Items with similar meaning end up with vectors pointing in similar directions:

    A 2D projection of a 384-dimension semantic vector space: Gaming Laptop, Gaming Desktop, and the phrase portable computer for games cluster together with high cosine similarity, while Running Shoes sits far away with low similarity

    “Gaming Laptop” and “Gaming Desktop” are nearby in 384-dimensional space. “Running Shoes” is off in a different neighborhood. You measure similarity by computing the cosine of the angle between two vectors — vectors pointing the same direction score close to 1.0, perpendicular vectors score 0, opposite vectors score -1.0.

    Making Embeddings Pluggable

    We don’t want the framework married to a specific embedding model. Maybe you’re fine with the default local model. Maybe you need OpenAI’s embeddings for higher accuracy on your domain. Maybe you’ve trained your own. So embedding generation is behind an interface:

    public interface EmbeddingProvider {
        float[] embed(String text);
        int getDimension();
        String getModelName();
    }
    

    embed() takes text, returns a vector. getDimension() is there so callers can verify compatibility with the vector store’s configured dimension — if you swap models and forget to update the config, you want a clear error, not a silent data corruption. getModelName() is just for logging.

    The Default Model

    Out of the box, the framework uses LangChain4j’s all-MiniLM-L6-v2. It’s a sentence transformer that runs locally via ONNX Runtime — no API key, no external service, no per-call cost. It produces 384-dimension vectors and captures genuine semantic similarity.

    public class LangChain4jEmbeddingProvider implements EmbeddingProvider {
    
        private final AllMiniLmL6V2EmbeddingModel model;
    
        public LangChain4jEmbeddingProvider() {
            this.model = new AllMiniLmL6V2EmbeddingModel();
        }
    
        @Override
        public float[] embed(final String text) {
            return model.embed(text).content().vector();
        }
    
        @Override
        public int getDimension() {
            return 384;
        }
    
        @Override
        public String getModelName() {
            return "all-MiniLM-L6-v2";
        }
    }
    

    The ONNX runtime takes about a second to load on first call, then it’s thread-safe and fast. The auto-configuration creates it as a @ConditionalOnMissingBean — define your own EmbeddingProvider bean and the default steps aside.

    I want to be clear: this is a real model, not a demo placeholder. “Gaming Laptop” and “Portable Computer for Games” genuinely show up as similar even though they share almost no words.


    HNSW: Searching Vectors Without Scanning Everything

    Once you have vectors, you need to search them. The brute-force approach compares your query vector against every stored vector — O(n) per query. Fine for hundreds of products. Not fine for a million.

    HNSW (Hierarchical Navigable Small World) is the standard answer. It builds a multi-layer graph over your vector space — think of it like a skip list but for geometric proximity. The top layers have sparse, long-range connections for coarse navigation. The bottom layers have dense, short-range connections for precision. You search by starting at the top, greedily navigating toward the query vector, then dropping down to finer layers. The result is O(log n) search with high recall.

    There are three tuning knobs:

    Parameter Controls Default
    maxDegree (M) Max edges per graph node 16
    efConstruction Beam width during index build — higher means better recall, slower build 200
    metric Distance function: COSINE, DOT, or EUCLIDEAN COSINE

    For a product catalog, the defaults are fine. You’d tune these if you were indexing millions of items and needed to trade off recall against memory or build time.


    Storing and Searching Vectors

    The vector store is exposed through VectorStoreService:

    public interface VectorStoreService {
    
        void storeEmbedding(String id, float[] embedding, Map<String, Object> metadata);
    
        List<SimilarityResult> findSimilar(float[] queryVector, int limit);
    
        List<SimilarityResult> findSimilarById(String id, int limit);
    
        boolean isAvailable();
    
        String getImplementationType();
    }
    

    Notice it takes float[]. The vector store doesn’t know or care how the embeddings were generated — the EmbeddingProvider produces vectors, the VectorStoreService stores and searches them. Two concerns, cleanly separated.

    SimilarityResult is just a record: (String id, float score, Map<String, Object> metadata).

    The Enterprise Path: VectorCollection

    Hazelcast Enterprise has a native VectorCollection data structure with built-in HNSW indexing. Our HazelcastVectorStoreService wraps it:

    public HazelcastVectorStoreService(HazelcastInstance hazelcast,
                                       VectorStoreProperties properties) {
        this.indexName = properties.getIndexName();
    
        Metric metric = Metric.valueOf(properties.getMetric().toUpperCase());
    
        VectorIndexConfig indexConfig = new VectorIndexConfig()
                .setName(indexName)
                .setDimension(properties.getDimension())
                .setMetric(metric)
                .setMaxDegree(properties.getMaxConnections())
                .setEfConstruction(properties.getEfConstruction());
    
        VectorCollectionConfig collectionConfig =
                new VectorCollectionConfig(properties.getCollectionName())
                        .addVectorIndexConfig(indexConfig);
    
        hazelcast.getConfig().addVectorCollectionConfig(collectionConfig);
        this.collection = VectorCollection.getCollection(
                hazelcast, properties.getCollectionName());
    }
    

    Storing an embedding wraps the metadata and vector into a VectorDocument:

    @Override
    public void storeEmbedding(String id, float[] embedding,
                               Map<String, Object> metadata) {
        String jsonMetadata = metadataToJson(metadata);
        VectorDocument<String> document = VectorDocument.of(
                jsonMetadata,
                VectorValues.of(indexName, embedding)
        );
        collection.putAsync(id, document).toCompletableFuture().join();
    }
    

    And searching is a single async call to the HNSW index:

    @Override
    public List<SimilarityResult> findSimilar(float[] queryVector, int limit) {
        SearchResults<String, String> searchResults = collection.searchAsync(
                VectorValues.of(indexName, queryVector),
                SearchOptions.builder()
                        .limit(limit)
                        .includeValue()
                        .build()
        ).toCompletableFuture().join();
    
        List<SimilarityResult> results = new ArrayList<>();
        for (SearchResult<String, String> hit : searchResults) {
            Map<String, Object> metadata = jsonToMetadata(hit.getValue());
            results.add(new SimilarityResult(hit.getKey(), hit.getScore(), metadata));
        }
        return results;
    }
    

    For comparison — brute-force IMap scans are O(n) per query; HNSW is O(log n). For 1,000 products the difference is negligible. For 1,000,000, it’s the difference between usable and not. That same O(n) IMap scan is exactly what the Community fallback uses, which we’ll look at next.

    The Community Path: Brute Force, but It Works

    Not everyone has Hazelcast Enterprise. The Community fallback is a SimpleVectorStoreService that stores embeddings in an ordinary Hazelcast IMap and answers queries with a brute-force O(n) cosine scan over every stored vector. The EmbeddingProvider still runs — it’s local ONNX, no license needed — and now the vectors it produces actually get stored and searched. It reports isAvailable() = true, so the similarity endpoint returns real results. It’s less scalable than the Enterprise HNSW path — that O(n) scan grows linearly with the catalog — but for development, testing, and small-to-moderate catalogs it’s fully operational.


    How It All Wires Together

    The module layout keeps the Enterprise dependency isolated:

    framework-core/                          (always built)
      └── EmbeddingProvider (interface)
      └── LangChain4jEmbeddingProvider (default, all-MiniLM-L6-v2)
      └── VectorStoreService (interface)
      └── SimpleVectorStoreService (Community fallback, IMap + O(n) cosine scan)
      └── VectorStoreAutoConfiguration
    
    framework-enterprise/                    (only with -Penterprise)
      └── HazelcastVectorStoreService (VectorCollection + HNSW)
      └── EnterpriseVectorStoreAutoConfiguration
    

    The Enterprise auto-configuration uses @AutoConfigureBefore so it registers first. If it creates a HazelcastVectorStoreService bean, the core @ConditionalOnMissingBean sees it and skips the Community fallback. If the Enterprise module isn’t on the classpath — which is the default — the SimpleVectorStoreService takes over.

    @Configuration
    @EnableConfigurationProperties(VectorStoreProperties.class)
    @ConditionalOnBean(HazelcastInstance.class)
    public class VectorStoreAutoConfiguration {
    
        @Bean
        @ConditionalOnMissingBean(EmbeddingProvider.class)
        public EmbeddingProvider embeddingProvider() {
            return new LangChain4jEmbeddingProvider();
        }
    
        @Bean
        @ConditionalOnMissingBean(VectorStoreService.class)
        public VectorStoreService vectorStoreService(HazelcastInstance hazelcast,
                                                     VectorStoreProperties properties) {
            return new SimpleVectorStoreService(hazelcast, properties);
        }
    }
    

    Build with mvn clean install for Community (the default). Build with mvn clean install -Penterprise to include the Enterprise module. The runtime figures out the rest.

    This is the same edition-aware pattern we use for other Enterprise-only features — CP Subsystem, HD Memory, TLS. Define the interface in framework-core, put the Enterprise implementation in framework-enterprise, let Spring’s auto-configuration ordering handle the selection. Community Edition always works. Enterprise features activate when you add the module and license.


    From Product Creation to Searchable Embedding

    When a product is created, the InventoryService builds a text representation from its name, description, and category, then asks the EmbeddingProvider to turn that into a vector:

    private void storeProductEmbedding(final Product product) {
        StringBuilder text = new StringBuilder();
        text.append(product.getName());
        if (product.getDescription() != null) {
            text.append(" ").append(product.getDescription());
        }
        if (product.getCategory() != null) {
            text.append(" ").append(product.getCategory());
        }
    
        float[] embedding = embeddingProvider.embed(text.toString());
    
        Map<String, Object> metadata = new HashMap<>();
        metadata.put("name", product.getName());
        metadata.put("category", product.getCategory());
    
        vectorStoreService.storeEmbedding(product.getProductId(), embedding, metadata);
    }
    

    The whole thing is wrapped in a try/catch as a best-effort operation. On either edition the embedding gets stored — in the Enterprise VectorCollection or the Community IMap — and if the vector store somehow isn’t available, the failure is swallowed so product creation still succeeds. The similarity feature is additive, not load-bearing.


    The REST Endpoint

    The Inventory Service exposes a similarity search endpoint:

    @GetMapping("/{productId}/similar")
    public ResponseEntity<SimilarProductsResponse> findSimilarProducts(
            @PathVariable String productId,
            @RequestParam(defaultValue = "5") int limit) {
    
        if (!productService.productExists(productId)) {
            return ResponseEntity.notFound().build();
        }
    
        if (!vectorStoreService.isAvailable()) {
            return ResponseEntity.ok(new SimilarProductsResponse(
                productId, false,
                vectorStoreService.getImplementationType(),
                "Vector similarity search is not available.",
                List.of()
            ));
        }
    
        List<SimilarityResult> results = vectorStoreService.findSimilarById(productId, limit);
    
        List<ProductDTO> similarProducts = new ArrayList<>();
        for (SimilarityResult result : results) {
            productService.getProduct(result.id())
                .ifPresent(product -> similarProducts.add(product.toDTO()));
        }
    
        return ResponseEntity.ok(new SimilarProductsResponse(
            productId, true,
            vectorStoreService.getImplementationType(),
            "Found " + similarProducts.size() + " similar products",
            similarProducts
        ));
    }
    

    The response shape is the same regardless of edition — the client gets vectorStoreAvailable: true/false and the getImplementationType() string so it knows which backend answered. Both editions return real similar products with similarity scores; the difference is underneath — Enterprise serves them from an HNSW index in O(log n), while Community runs the brute-force O(n) IMap scan. Same results, different scaling characteristics.


    Configuration

    All the vector store parameters live under framework.vectorstore in your application YAML:

    framework:
      vectorstore:
        collection-name: product-vectors
        dimension: 384               # Must match EmbeddingProvider.getDimension()
        max-connections: 16          # HNSW maxDegree (M)
        ef-construction: 200         # HNSW build beam width
        metric: COSINE               # COSINE, DOT, or EUCLIDEAN
        index-name: default          # HNSW index name
    

    The dimension has to match whatever your EmbeddingProvider produces. The default (384) matches all-MiniLM-L6-v2. If you swap in OpenAI’s text-embedding-3-small (1536 dimensions), update this property or you’ll get index errors that are confusing to debug.


    Trying It Out

    With the Docker stack running:

    # Load sample products (creates embeddings automatically)
    ./scripts/load-sample-data.sh
    
    # Find products similar to a known product
    curl http://localhost:8082/api/products/<product-id>/similar?limit=5
    
    # Or run the demo scenario
    ./scripts/demo-scenarios.sh 4
    

    Demo scenario 4 detects the edition, looks up a product, calls the similarity endpoint, and displays results with appropriate messaging for either edition.


    What’s Next

    Bring your own model. The EmbeddingProvider interface makes swapping models easy. Define a bean, return vectors, done:

    @Bean
    public EmbeddingProvider embeddingProvider() {
        return new EmbeddingProvider() {
            private final OpenAiEmbeddingModel model = /* your config */;
    
            @Override
            public float[] embed(String text) {
                return model.embed(text).content().vector();
            }
    
            @Override
            public int getDimension() { return 1536; }
    
            @Override
            public String getModelName() { return "text-embedding-3-small"; }
        };
    }
    

    OpenAI, Cohere, any Sentence-Transformer via LangChain4j’s ONNX integration — just update framework.vectorstore.dimension to match.

    Hybrid search is the obvious next step: “find products similar to this laptop, but only in Electronics and under $1000.” That combines a VectorCollection similarity search with IMap predicate filtering. It’s also exactly the kind of natural-language query that works well with AI-driven orchestration — an LLM can decompose that request into a similarity lookup plus attribute filters, call the right APIs, and merge the results. We’ll get into that in an upcoming post on the Model Context Protocol (MCP), which gives AI models structured access to our microservices.

    Multi-modal search is possible too. Hazelcast’s VectorCollection supports multiple named indexes on a single collection — one for text embeddings, another for image embeddings. Same data structure, different similarity dimensions.


    Next up: AI-Powered Microservices with the Model Context Protocol

    Previous: Saga Pattern: Distributed Transactions Without 2PC

    Code: github.com/myawnhc/hazelcast-microservices-framework — clone it, docker-compose up, and the framework boots locally with sample data.
  • On Debugging by Assumption

    A short interstitial in the “Building Event-Driven Microservices with Hazelcast” series


    “It ain’t what you don’t know that gets you into trouble. It’s what you know for sure that just ain’t so.” — Mark Twain

    Measurement is better than guessing. Who knew?

    When the saga implementation was first finished and we ran through the test scenarios for the first time, there was a high incidence of saga timeouts if we ran for more than a few minutes. (5 minutes was great; 30 minutes was ugly.)

    I didn’t ask Claude to investigate, or do any analysis of my own, because I had a pretty good suspicion what was going on. Everything was running on a single laptop — 16GB of memory split across a 3-node Hazelcast cluster, 4 services each running an embedded Hazelcast node, Docker itself, and I really hadn’t bothered to shut off my normal desktop workload. Web browser, email, whatever else. I figured I’d maxed the poor thing out and probably wouldn’t get a clean timeout-free run until I deployed to a multi-node cluster in the cloud.

    That didn’t happen for some time. I’m cheap, and I wasn’t going to pay for cloud resources until I had a full-blown demo ready to go. When the day came — much later in the story if I was telling it chronologically — I saw the same pattern of timeouts. Turns out, it was never thread starvation or lack of resources. It was a combination of things, and none of them were what I’d assumed.

    When faced with this reality, I asked Claude to troubleshoot the issue, and this is one of the times I was most impressed with how Claude approached a problem compared to how I would have.

    In most debugging scenarios, I look only until I find the first reasonable suspect. Why keep looking if you’ve already found what you’re looking for? Fix, rebuild, retest, and on a good day, that’s the end of it. On a bad day, you’re still looking at the same issue, so you start hunting for suspect #2. Lather, rinse, repeat.

    Claude came back with four identified problems. The main one was subtle: we generated product data up front and gave each item a reasonable starting stock. As the demo ran, orders depleted the stock, and eventually the inventory service started throwing InsufficientStockException — correct behavior, you can’t sell what you don’t have. But the circuit breaker we’d added for resilience was treating that business error the same as an infrastructure failure. Enough “failures” in the sliding window and the circuit breaker tripped open, rejecting all orders — including ones for products that still had stock. Sagas piled up with nowhere to go, the timeout detector found hundreds of them every cycle, and the system drowned in compensation events. At the peak: 64,000 timeouts from 53,000 sagas started.

    The other three fixes addressed related gaps. Business failures like out-of-stock now trigger immediate saga compensation instead of waiting for the timeout detector to notice. A NonRetryableException marker interface tells the circuit breaker not to count deterministic business errors against the failure rate. And an automatic stock replenishment monitor keeps the demo in a steady state where orders can actually succeed for hours instead of wedging after the first few minutes.

    I should have investigated the saga timeouts when they first appeared, rather than assuming the problem would magically go away with more hardware. And when I did get around to investigating, Claude’s approach of identifying all the contributing problems at once was considerably more effective than my usual one-suspect-at-a-time strategy.

    Code: github.com/myawnhc/hazelcast-microservices-framework — clone it, docker-compose up, and the framework boots locally with sample data.
  • Saga Pattern: Distributed Transactions Without 2PC

    Saga Pattern: Distributed Transactions Without 2PC

    Part 4 in the “Building Event-Driven Microservices with Hazelcast” series


    Introduction

    In the first three articles, we built an event sourcing framework with a Jet pipeline and materialized views. Each service is self-contained — it owns its events, its views, and its data. And one of the things that makes event-driven architecture powerful is that event publishing is fire-and-forget. The publisher doesn’t know or care who’s receiving the messages, or whether anyone acted on them properly.

    But someone cares. Probably your boss.

    That decoupling is a feature, not a bug — it’s exactly what gives you the independence to evolve services separately. If you wanted to add a loyalty program based on orders placed, you wouldn’t touch a single line of existing code. You’d build the new functionality and subscribe it to OrderCreated events. Done. No coordination, no release trains, no “can you add a field to your response for us.”

    But that same independence becomes a problem when a business operation requires coordination. Placing an order in our eCommerce system spans four services:

    1. Order Service creates the order
    2. Inventory Service reserves stock
    3. Payment Service charges the customer
    4. Order Service confirms the order

    If payment fails after stock is reserved, we need to release the stock. If the inventory service is down, we need to cancel the order. The fire-and-forget publisher doesn’t know any of this happened — and nobody else is keeping track unless we build something that does.

    Now — a real eCommerce system wouldn’t cancel an order just because the inventory service hiccupped. You’d create a backorder, or queue the reservation for retry, or do whatever it takes to keep the customer’s money. Nobody in the business is going to say “yeah, let’s give up on that sale because a container restarted.” But we’re building a framework to demonstrate microservice patterns, not to compete with Shopify. The cancel-and-compensate flow shows sagas at their most illustrative: forward steps, failure detection, compensation, and state tracking across services. That’s the machinery we want to examine. So we went with the version that best shows how the pattern works, not the version that best sells laptops.

    This is the distributed transaction problem, and sagas are the solution.


    Why Sagas? Because the Alternative Is Worse.

    We didn’t adopt event sourcing because it was fashionable. We adopted it because the alternative — mutable state scattered across services with no audit trail — was worse. Sagas are the same kind of choice.

    When a business operation touches one database, you wrap it in a transaction. ACID gives you atomicity: either all the changes commit or none of them do. Every developer learns this in their first database course, and it works.

    But our order fulfillment touches four services, each with its own data. A single database transaction can’t span them. So you reach for two-phase commit — 2PC — the textbook answer for distributed transactions. And that’s where the trouble starts.

    2PC works by appointing a coordinator that asks each participant “can you commit?” and then, once everyone says yes, tells them all to go ahead. The problem is what happens between those two phases. Every participant is holding locks — on inventory rows, on payment records, on order state — while waiting for the coordinator’s decision. In a monolith, that wait is microseconds. Across a network, it’s milliseconds at best, and if the coordinator is slow or the network partitions, those locks can be held for seconds. Or longer.

    Think about what that means for throughput. While one order is sitting in that limbo state between “prepare” and “commit,” no other transaction can touch those same inventory rows. At 50 orders per second, you’ve got 50 sets of distributed locks competing for the same resources, each one waiting on network round-trips to a coordinator that is itself a single point of failure. If the coordinator crashes mid-protocol, those locks stay held until someone intervenes manually. The participants are stuck — they’ve promised to commit but haven’t been told to, and they can’t unilaterally release the locks without risking inconsistency.

    It’s not that 2PC is theoretically wrong. It works fine in environments where all participants are on the same local network, latency is sub-millisecond, and the coordinator never fails. That environment is called “a single database server.” Once you’ve distributed beyond that, you’re fighting physics.

    Sagas take a fundamentally different approach. Instead of one big atomic transaction with distributed locks, you execute a sequence of local transactions — each one fast, each one touching only one service’s data, each one committing immediately. If step 3 fails, you don’t roll back steps 1 and 2 by releasing locks. You compensate — you issue new transactions that undo the business effect of the earlier steps. Reserve stock, then charge payment, then… payment fails? Issue a stock release. The saga doesn’t pretend the work never happened. It acknowledges what happened and fixes it forward.

    The trade-off is that you give up the clean atomicity of “all or nothing” and accept eventual consistency — a window of time where stock is reserved but payment hasn’t been attempted yet, or payment has been charged but the order isn’t confirmed. In our system, that window is typically under a second. For the user, the order is “processing.” For the system, the saga is in flight. And if something fails, compensation events fire and the system converges to a consistent state — just not instantaneously.


    Choreography and Orchestration

    There are two ways to coordinate a saga. They represent genuinely different architectural philosophies, and neither is universally right.

    Choreography: Services React to Events

    In a choreographed saga, there’s no coordinator. Each service listens for events that concern it and reacts:

    Order Service publishes OrderCreated -->
      Inventory Service hears it, reserves stock, publishes StockReserved -->
        Payment Service hears it, charges customer, publishes PaymentProcessed -->
          Order Service hears it, confirms order
    

    The flow emerges from the interactions between services. No single component knows the full sequence. Each service knows only “when I see event X, I do Y and publish Z.”

    Orchestration: A Central Controller Directs the Flow

    In an orchestrated saga, one component — the orchestrator — knows the whole sequence and directs each step:

    Orchestrator --> "Order Service, create order"
    Orchestrator --> "Inventory Service, reserve stock"
    Orchestrator --> "Payment Service, charge customer"
    Orchestrator --> "Order Service, confirm order"
    

    The orchestrator is a state machine. It tracks where the saga is, what comes next, and what to undo if something fails.

    Choosing Between Them

    Choreography is the simpler choice when your saga is a linear chain — A triggers B triggers C triggers D — and the services are already publishing events for other reasons. If your system is event-driven (ours is), choreographed sagas are almost free. The services are already emitting events. The saga is just another consumer. No new infrastructure, no single point of failure, and each service stays fully independent.

    But choreography gets painful as the flow gets complex. Say step 2 needs to branch — charge a credit card or apply store credit, depending on the payment method. Now you’re encoding conditional logic across multiple listeners that can’t see each other. If someone asks “what does this saga actually do, end to end?” the answer is “go read three listener classes in three different services and piece it together.” For a four-step linear chain, fine. For a ten-step flow with branches and conditional skips, that’s a mess.

    That’s where orchestration earns its keep. The entire saga — forward steps, compensation steps, timeouts, retry logic — lives in a single definition file. You read it top to bottom. You can add per-step timeouts, per-step retries. The caller can wait for the result synchronously. The cost is coupling: the orchestrator needs to know about every service’s endpoint, and it becomes a component you have to keep running.

    A rough rule of thumb:

    Start with choreography when the saga is a linear chain with fewer than about five steps, the services already publish events, and you don’t need a synchronous response. Move to orchestration when you need branching or conditional logic, per-step timeout and retry control, a synchronous success/failure response for the caller, or when the saga has gotten complex enough that nobody can trace it across listeners without a whiteboard.

    We started with choreography for order fulfillment because it’s a clean four-step chain and our entire architecture is already built around events. Later, when we needed synchronous responses for certain API paths and wanted per-step timeout control, we added orchestration as a second option. Both patterns coexist in the framework, running against the same saga state store. We’ll cover the orchestration implementation and how we run both side by side in a later article.


    The Order Fulfillment Saga

    Here’s the full flow for the choreographed version:

    Order Fulfillment Saga happy path — four services emitting OrderCreated, StockReserved, PaymentProcessed, OrderConfirmed in sequence, with saga state transitioning from STARTED through IN_PROGRESS to COMPLETED

    Event Context Propagation

    Every saga event carries metadata that links the chain together:

    // These fields are present on every saga event
    String sagaId;         // Links all events in one saga instance
    String correlationId;  // Links to the original API request
    String sagaType;       // "OrderFulfillment"
    int stepNumber;        // Which step (0, 1, 2, 3)
    boolean isCompensating; // True for compensation events
    

    The sagaId is generated when the order is created and propagated through every subsequent event. That’s how the Inventory Service knows which saga a StockReserved event belongs to, and how the Payment Service gets the payment details — amount, currency, method — from the StockReserved event’s context fields.


    Implementing Saga Listeners

    Each service has a saga listener that subscribes to relevant events via Hazelcast ITopic. Here’s the Payment Service’s:

    @Component
    public class PaymentSagaListener {
    
        public PaymentSagaListener(
                @Qualifier("hazelcastClient") HazelcastInstance hazelcast,
                PaymentService paymentService,
                SagaStateStore sagaStateStore) {
    
            // Listen for StockReserved --> process payment
            ITopic<GenericRecord> stockReservedTopic = hazelcast.getTopic("StockReserved");
            stockReservedTopic.addMessageListener(message -> {
                GenericRecord event = message.getMessageObject();
                String sagaId = event.getString("sagaId");
                String orderId = event.getString("orderId");
                String amount = event.getString("paymentAmount");
                String currency = event.getString("paymentCurrency");
                String method = event.getString("paymentMethod");
    
                paymentService.processPaymentForOrder(
                    orderId, customerId, amount, currency, method,
                    sagaId, event.getString("correlationId")
                );
            });
    
            // Listen for PaymentRefundRequested --> process refund
            ITopic<GenericRecord> refundTopic = hazelcast.getTopic("PaymentRefundRequested");
            refundTopic.addMessageListener(message -> {
                GenericRecord event = message.getMessageObject();
                String paymentId = event.getString("paymentId");
                String sagaId = event.getString("sagaId");
    
                paymentService.refundPaymentForSaga(
                    paymentId, "Saga compensation", sagaId,
                    event.getString("correlationId")
                );
            });
        }
    }
    

    The listener uses @Qualifier(“hazelcastClient”) — it connects to the shared cluster, not the embedded instance. That’s the dual-instance architecture from Part 2. Each listener is a plain @Component that Spring creates on startup; the ITopic subscription stays active for the life of the service. And the listener itself doesn’t contain business logic — it unpacks the event, delegates to the service layer, and gets out of the way.


    Compensation: Undoing Work

    When a step fails, we need to undo the work completed by previous steps. This is compensation — often described as the saga equivalent of a rollback, though it isn’t really a rollback at all. More on that in a minute.

    Compensation Flow: Payment Fails

    Order Fulfillment Saga compensation flow — payment fails at step 3, triggering reverse-order compensation events PaymentRefundRequested, StockReleased, and OrderCancelled, ending in the COMPENSATED state

    Compensation Flow: Stock Unavailable

    StockReservationFailed event
             |
             '---> Order Service
                   '-- Cancels order (status: CANCELLED)
                   '-- No stock release needed (nothing was reserved)
    
             Saga finalized as COMPENSATED
    

    The Compensation Registry

    A CompensationRegistry maps each forward event to its compensating event and responsible service:

    @Configuration
    public class ECommerceCompensationConfig {
    
        @Bean
        public CompensationRegistrar ecommerceCompensations(CompensationRegistry registry) {
            // Step 0: OrderCreated --> compensate with OrderCancelled
            registry.register("OrderCreated", "OrderCancelled", "order-service");
    
            // Step 1: StockReserved --> compensate with StockReleased
            registry.register("StockReserved", "StockReleased", "inventory-service");
    
            // Step 2: PaymentProcessed --> compensate with PaymentRefunded
            registry.register("PaymentProcessed", "PaymentRefunded", "payment-service");
    
            // Step 3: OrderConfirmed --> no compensation (terminal success)
            return () -> {};
        }
    }
    

    This is the one place that documents the entire saga structure. Even though the execution is distributed across listeners, the registry makes the step-to-compensation mapping readable in a single file. That’s something people miss about choreography — it doesn’t mean the structure is undocumented, it means the structure is declared separately from the execution.


    Saga State Tracking

    The SagaState class is an immutable state machine that tracks each saga instance:

    public class SagaState implements Serializable {
    
        private final String sagaId;
        private final String sagaType;         // "OrderFulfillment"
        private final String correlationId;
        private final SagaStatus status;       // STARTED --> IN_PROGRESS --> COMPLETED
        private final List<SagaStepRecord> steps;
        private final Instant startedAt;
        private final Instant deadline;        // Absolute timeout
    }
    

    Status transitions:

    STARTED --> IN_PROGRESS --> COMPLETED     (happy path)
    STARTED --> IN_PROGRESS --> COMPENSATING --> COMPENSATED  (failure + recovery)
    STARTED --> IN_PROGRESS --> TIMED_OUT --> COMPENSATING --> COMPENSATED  (timeout)
    STARTED --> IN_PROGRESS --> FAILED        (unrecoverable)
    

    The state lives in a HazelcastSagaStateStore backed by an IMap on the shared cluster. Every service can read and update saga state because they all connect to the same shared cluster via the client instance.

    Each step gets recorded:

    public class SagaStepRecord implements Serializable {
    
        private final int stepNumber;
        private final String eventType;
        private final StepStatus status;    // PENDING, COMPLETED, FAILED, COMPENSATED
        private final Instant completedAt;
        private final String failureReason;
    }
    

    Timeout Handling

    Sagas can get stuck. A service might be down, a message might be lost, or a listener might throw an unhandled exception. Without timeout detection, a stuck saga hangs forever — stock reserved but never charged, or charged but never confirmed.

    The SagaTimeoutDetector is a scheduled service that runs in each service instance:

    @Component
    public class SagaTimeoutDetector {
    
        @Scheduled(fixedDelayString = "${saga.timeout.check-interval:5000}")
        public void detectTimeouts() {
            List<SagaState> timedOut = sagaStateStore.findTimedOutSagas(
                maxBatchSize
            );
    
            for (SagaState saga : timedOut) {
                sagaStateStore.markTimedOut(saga.getSagaId());
    
                if (autoCompensate) {
                    compensator.compensate(saga);
                }
    
                applicationEventPublisher.publishEvent(
                    new SagaTimedOutEvent(saga)
                );
    
                sagaMetrics.recordSagaTimedOut(saga.getSagaType());
            }
        }
    }
    

    Every five seconds, it looks for sagas that have blown past their deadline. When it finds one, it marks it timed out, triggers compensation for whatever steps already completed, publishes a Spring event for logging and alerting, and records the metric.

    Timeout behavior is configurable per service:

    saga:
      timeout:
        enabled: true
        check-interval: 5000          # Check every 5 seconds
        default-deadline: 30000       # 30-second default timeout
        auto-compensate: true         # Auto-trigger compensation
        max-batch-size: 100           # Process up to 100 timeouts per check
        saga-types:
          OrderFulfillment: 60000     # 60 seconds for order fulfillment
    

    The Order Fulfillment saga gets 60 seconds — longer than the 30-second default — because it spans four services and includes payment processing, which can be slow. Choosing the right timeout value is harder than it sounds, and we have quite a bit more to say about that in the next article.


    Monitoring Sagas

    The SagaMetrics class exposes counters and timers to Prometheus:

    Metric What It Tells You
    saga.started How many sagas are being created
    saga.completed How many succeed end-to-end
    saga.compensated How many required rollback
    saga.timedout How many exceeded their deadline
    saga.duration (p95) How long sagas are taking
    saga.compensation.duration (p95) How long compensation takes

    The pre-provisioned Saga Dashboard in Grafana visualizes active saga counts, throughput rates by saga type, duration percentiles, success rates, timeout detection rates, and compensation breakdowns. Pre-configured alerts fire on high failure rates, timeouts, compensation failures, and success rate drops below 90%.

    The dashboard is useful. But the metric that actually saved us during a sustained load incident was dead simple: saga.timedout plotted alongside saga.completed. When timeouts exceeded completions, we knew we had a systemic problem, not just a few slow sagas. More on that story next time.


    Design Decisions and Trade-offs

    Eventual Consistency

    Sagas provide eventual consistency, not immediate consistency. Between the time stock is reserved and payment is processed, the system is in a “pending” state. That’s intentional — the trade-off buys us service independence and availability. For our use case, the consistency window is sub-second. For domains where that’s not acceptable (financial settlement, medical records), you’d need stronger guarantees.

    Idempotency

    Saga listeners must be idempotent. ITopic delivery is at-most-once in Hazelcast, but the timeout detector might trigger compensation for a saga that was actually processing — just slowly. If the original flow completes after compensation starts, the system has to handle the overlap gracefully.

    SagaState handles this with updateOrAddStep, which replaces by step number. A step can’t be recorded twice.

    Compensation Is Not Rollback

    This is worth being explicit about, because the terminology invites confusion.

    Compensation undoes the business effect, not the technical state. When we “refund a payment,” we don’t delete the payment record. We create a new PaymentRefundedEvent that changes the payment status to REFUNDED. The event history preserves the full story: payment was processed, then refunded. If an auditor comes asking, the trail is complete.

    This fits naturally with event sourcing. The compensation is just another event. Nothing is erased, nothing is pretended away. The system records what actually happened, including the part where something went wrong and was corrected.


    Summary

    The saga pattern solves distributed transaction coordination without distributed locks:

    Choreographed sagas fit naturally with event sourcing — each service reacts to events independently, no coordinator required. Orchestrated sagas earn their place when flows grow complex, need synchronous responses, or require per-step control. Compensation provides rollback-like semantics through new events, preserving the full history. Timeout detection catches stuck sagas. Saga state tracking gives you a complete audit trail.

    The result is a system where four independent services coordinate a complex business transaction without any service knowing about the others — they only know about events. The cost is that you’re managing a distributed state machine instead of a database transaction. But the alternative — distributed locks held across network calls while a coordinator decides everyone’s fate — is worse.


    Next up: Saga Timeouts — When Distributed Things Go Wrong

    Previous: Materialized Views for Fast Queries

    Code: github.com/myawnhc/hazelcast-microservices-framework — clone it, docker-compose up, and the framework boots locally with sample data.
  • A Note on Observability

    A short interstitial in the “Building Event-Driven Microservices with Hazelcast” series


    Originally, the fourth post in this series was about observability — metrics, tracing, dashboards. Because the posts were being written alongside the code, it only covered what existed at the time: the event sourcing and saga metrics. As the framework grew, metrics got added in a lot of places — persistence, resilience features, a business-oriented dashboard.

    A post on observability that covered a fraction of the actual observability felt incomplete, so it was expanded and moved later in the sequence. It’ll be around post 12.

    But the blog post moved. The implementation didn’t. Observability isn’t something you bolt on after everything else is working — it’s how you know whether everything else is working. If you’re building a multi-phase project like this, instrument early. You’ll be glad you did when something breaks at 2 AM on phase 3 that would have been obvious from a dashboard you built in phase 1.

    Code: github.com/myawnhc/hazelcast-microservices-framework — clone it, docker-compose up, and the framework boots locally with sample data.
  • Hazelcast Materialized Views: CQRS for Sub-ms Reads

    Part 3 in the “Building Event-Driven Microservices with Hazelcast” series


    Introduction

    We’ve established that events are the source of truth (Part 1) and built a Jet pipeline to process them (Part 2). But we’ve been dancing around a question that anyone who’s thought about event sourcing for more than five minutes will ask:

    If everything is stored as a sequence of events, how do you actually query anything?


    The Problem with Raw Events

    The naive answer is “replay them.” Want the current state of customer 123? Walk through every event that ever happened to customer 123 and apply them in order:

    public Customer getCustomer(String customerId) {
        List<Event> events = eventStore.getEventsForKey(customerId);
    
        Customer customer = null;
        for (Event event : events) {
            customer = event.apply(customer);
        }
        return customer;
    }
    

    This works. It’s also terrible. A customer with 10 events takes 10x longer to load than a brand-new customer with 1. A customer who’s been active for three years and has 1,000 events? Good luck serving that from an API endpoint with a latency SLA.

    You need O(1) lookups, not O(n) replays on every GET request.


    CQRS: The Pattern You Don’t Get to Opt Out Of

    This is where event sourcing stops being a standalone pattern and starts demanding an architectural partner.

    In a CRUD system, one table does everything. You INSERT into it, UPDATE it, and SELECT from it. The read model and the write model are the same thing — a row in a database. Simple.

    Event sourcing breaks that. Your write model is an append-only event log. Great for durability, great for auditing, terrible for queries. You can’t SELECT a customer’s current address from a log of every change that’s ever happened to them. Not efficiently, anyway.

    So you need a separate read model. A structure that’s optimized for the queries your application actually needs. This separation — commands go to the write model, queries go to the read model — is CQRS: Command Query Responsibility Segregation.

    We didn’t choose CQRS because it appeared on a list of architecture buzzwords. Event sourcing forced us into it. Once your source of truth is an event log, you need a separate read path. There’s no way around it.

    Now, CQRS is a general pattern. The read model could be a relational database you project events into for complex SQL queries. It could be Elasticsearch for full-text search, or a data warehouse for analytics. In our framework, we project into Hazelcast IMaps — materialized views that update in real-time as events flow through the Jet pipeline. The IMap gives us sub-millisecond lookups and keeps the read model co-located with the processing engine. No network hop, no separate database to manage.

    Materialized views are our implementation of the read side of CQRS.


    What is a Materialized View?

    A materialized view is a pre-computed projection. Instead of computing state on every query, you compute it once — when the event is processed in Stage 4 of the pipeline — and store the result. Queries just look up the stored result. In event sourcing literature, the component responsible for maintaining these projections is sometimes called a projection engine. In our framework, that’s the Jet pipeline from Part 2.

    Event replay O(n) vs materialized view O(1)

    The write path pays the cost once. Every subsequent read is free. The more reads you have per write — and most systems are read-heavy — the bigger the win.


    The ViewStore

    The HazelcastViewStore wraps an IMap:

    public class HazelcastViewStore<K> {
    
        private final IMap<K, GenericRecord> viewMap;
        private final String viewName;
    
        public HazelcastViewStore(HazelcastInstance hazelcast, String viewName) {
            this.viewName = viewName + "_VIEW";
            this.viewMap = hazelcast.getMap(this.viewName);
        }
    
        public Optional<GenericRecord> get(K key) {
            return Optional.ofNullable(viewMap.get(key));
        }
    
        public void put(K key, GenericRecord value) {
            viewMap.set(key, value);
        }
    
        public void remove(K key) {
            viewMap.delete(key);
        }
    
        public GenericRecord executeOnKey(K key,
                EntryProcessor<K, GenericRecord, GenericRecord> processor) {
            return viewMap.executeOnKey(key, processor);
        }
    }
    

    We use GenericRecord for the values — Hazelcast’s schema-flexible format that doesn’t require Java classes on the cluster. The executeOnKey method gives us atomic updates through Hazelcast’s EntryProcessor. As we discussed in Part 2, the EntryProcessor runs on the partition thread for that key — the same single-threaded-per-partition model that makes our pipeline ordering work. It reads current state, applies the change, and writes the result, all on one thread with no possibility of a concurrent modification. Atomic by architecture, not by locking.


    The ViewUpdater

    The ViewUpdater is the abstract class that each domain implements. It defines two things: how to extract the key from an event, and how to apply the event to produce new state.

    public abstract class ViewUpdater<K> implements Serializable {
    
        protected final transient HazelcastViewStore<K> viewStore;
    
        protected abstract K extractKey(GenericRecord eventRecord);
    
        protected abstract GenericRecord applyEvent(
            GenericRecord eventRecord,
            GenericRecord currentState);
    
        public GenericRecord updateDirect(GenericRecord eventRecord) {
            K key = extractKey(eventRecord);
            if (key == null) {
                logger.warn("Could not extract key from event");
                return null;
            }
    
            GenericRecord currentState = viewStore.get(key).orElse(null);
            GenericRecord updatedState = applyEvent(eventRecord, currentState);
    
            if (updatedState != null) {
                viewStore.put(key, updatedState);
            } else if (currentState != null) {
                viewStore.remove(key);
            }
    
            return updatedState;
        }
    }
    

    Returning null from applyEvent is the deletion convention. The updater removes the entry from the view. Everything else either creates a new entry or updates an existing one.


    Example: Customer View

    Here’s what a concrete implementation looks like:

    public class CustomerViewUpdater extends ViewUpdater<String> {
    
        public static final String VIEW_NAME = "Customer";
    
        public CustomerViewUpdater(HazelcastViewStore<String> viewStore) {
            super(viewStore);
        }
    
        @Override
        protected String extractKey(GenericRecord eventRecord) {
            return eventRecord.getString("key");
        }
    
        @Override
        protected GenericRecord applyEvent(GenericRecord event, GenericRecord current) {
            String eventType = getEventType(event);
    
            return switch (eventType) {
                case "CustomerCreated" -> createCustomer(event);
                case "CustomerUpdated" -> updateCustomer(event, current);
                case "CustomerStatusChanged" -> changeStatus(event, current);
                case "CustomerDeleted" -> null;
                default -> {
                    logger.debug("Unknown event type: {}", eventType);
                    yield current;
                }
            };
        }
    
        private GenericRecord createCustomer(GenericRecord event) {
            Instant now = Instant.now();
            return GenericRecordBuilder.compact(VIEW_NAME)
                .setString("customerId", event.getString("key"))
                .setString("email", event.getString("email"))
                .setString("name", event.getString("name"))
                .setString("address", event.getString("address"))
                .setString("phone", getStringField(event, "phone"))
                .setString("status", "ACTIVE")
                .setInt64("createdAt", now.toEpochMilli())
                .setInt64("updatedAt", now.toEpochMilli())
                .build();
        }
    
        private GenericRecord updateCustomer(GenericRecord event, GenericRecord current) {
            if (current == null) {
                logger.warn("Update event for non-existent customer: {}",
                    event.getString("key"));
                return null;
            }
    
            return GenericRecordBuilder.compact(VIEW_NAME)
                .setString("customerId", current.getString("customerId"))
                .setString("email", coalesce(event.getString("email"),
                                             current.getString("email")))
                .setString("name", coalesce(event.getString("name"),
                                            current.getString("name")))
                .setString("address", coalesce(event.getString("address"),
                                               current.getString("address")))
                .setString("phone", coalesce(getStringField(event, "phone"),
                                             current.getString("phone")))
                .setString("status", current.getString("status"))
                .setInt64("createdAt", current.getInt64("createdAt"))
                .setInt64("updatedAt", Instant.now().toEpochMilli())
                .build();
        }
    
        private GenericRecord changeStatus(GenericRecord event, GenericRecord current) {
            if (current == null) return null;
    
            return GenericRecordBuilder.compact(VIEW_NAME)
                .setString("customerId", current.getString("customerId"))
                .setString("email", current.getString("email"))
                .setString("name", current.getString("name"))
                .setString("address", current.getString("address"))
                .setString("phone", current.getString("phone"))
                .setString("status", event.getString("newStatus"))
                .setInt64("createdAt", current.getInt64("createdAt"))
                .setInt64("updatedAt", Instant.now().toEpochMilli())
                .build();
        }
    
        private String coalesce(String newValue, String currentValue) {
            return newValue != null ? newValue : currentValue;
        }
    }
    

    The coalesce pattern in updateCustomer handles partial updates — if the event only includes a new email but not a new name, we keep the existing name. The updatedAt timestamp always advances, which is useful for staleness checks later.


    Cross-Service Views: Denormalization Without the Guilt

    This is where materialized views get genuinely powerful.

    Consider displaying an order. The raw order data has a customer ID and product IDs, but no names, no emails, no SKUs. To show a complete order in the UI, you need data that lives in two other services.

    The traditional approach:

    Order order = orderRepository.findById(orderId);
    Customer customer = accountService.getCustomer(order.getCustomerId());  // HTTP call
    for (LineItem item : order.getItems()) {
        Product product = inventoryService.getProduct(item.getProductId());  // HTTP call
        item.setProductName(product.getName());
    }
    

    Three services involved at query time. If Account is slow, your order page is slow. If Inventory is down, your order page is broken. And you’ve just coupled three services together at runtime, which is exactly what microservices were supposed to prevent.

    With event sourcing, you build an enriched view that bakes in the data from other services at write time:

    public class EnrichedOrderViewUpdater extends ViewUpdater<String> {
    
        private final IMap<String, GenericRecord> customerView;
        private final IMap<String, GenericRecord> productView;
    
        @Override
        protected GenericRecord applyEvent(GenericRecord event, GenericRecord current) {
            String eventType = getEventType(event);
    
            if ("OrderCreated".equals(eventType)) {
                return createEnrichedOrder(event);
            }
            return current;
        }
    
        private GenericRecord createEnrichedOrder(GenericRecord event) {
            String customerId = event.getString("customerId");
    
            // Look up customer from local view — no HTTP call
            GenericRecord customer = customerView.get(customerId);
            String customerName = customer != null ?
                customer.getString("name") : "Unknown";
            String customerEmail = customer != null ?
                customer.getString("email") : "";
    
            List<GenericRecord> enrichedItems = new ArrayList<>();
            // ... iterate through items, look up products from productView
    
            return GenericRecordBuilder.compact("EnrichedOrder")
                .setString("orderId", event.getString("key"))
                .setString("customerId", customerId)
                .setString("customerName", customerName)
                .setString("customerEmail", customerEmail)
                .setArrayOfGenericRecord("lineItems",
                    enrichedItems.toArray(new GenericRecord[0]))
                .setString("status", "PENDING")
                .build();
        }
    }
    

    The result is a single IMap entry with everything you need:

    {
      "orderId": "order-123",
      "customerId": "cust-456",
      "customerName": "Alice Smith",
      "customerEmail": "alice@example.com",
      "lineItems": [
        {
          "productId": "prod-789",
          "productName": "Gaming Laptop",
          "sku": "LAPTOP-001",
          "quantity": 2,
          "unitPrice": 49.99
        }
      ],
      "status": "PENDING"
    }
    

    One lookup. Zero service calls. Works if Account and Inventory are both down for maintenance.

    If you’re from a relational database background, denormalization feels wrong — it violates third normal form, it duplicates data, your DBA would glare at you. But in an event-sourced system, the events are the normalized source of truth. Views are disposable projections. Denormalize all you want. If Alice changes her name, the CustomerUpdated event flows through, and you can update the enriched order view to reflect it. Or not, depending on whether you care — the order was placed when she was “Alice Smith,” and that’s what the event says.


    View Rebuilding

    One of the benefits we mentioned in Part 1 — and it’s worth seeing in practice. Found a bug in your view update logic? Fix the code, clear the view, replay the events:

    public <D extends DomainObject<K>, E extends DomainEvent<D, K>> long rebuild(
            EventStore<D, K, E> eventStore) {
    
        logger.info("Starting view rebuild for {} from {}",
            viewStore.getViewName(), eventStore.getStoreName());
    
        viewStore.clear();
    
        AtomicLong count = new AtomicLong(0);
        eventStore.replayAll(eventRecord -> {
            updateDirect(eventRecord);
            count.incrementAndGet();
        });
    
        logger.info("Rebuild complete. Processed {} events", count.get());
        return count.get();
    }
    

    Same mechanism handles schema migrations, new view types (write a new ViewUpdater, replay existing events — instant backfill), and disaster recovery. With CRUD, corrupted data is permanently corrupted. With event sourcing, the correct data is always recoverable from the event stream.


    View Patterns

    Not every view is a 1:1 entity projection. Here are the patterns we use:

    Entity View — one entry per domain object. Customer events produce one Customer view entry, keyed by customer ID. This is the default.

    Lookup View — index by an alternate key. A Customer-by-Email view maps email → customerId, so you can find a customer by email without scanning. When a customer changes their email, the old entry gets deleted and a new one created.

    Summary View — aggregate across entities. A Customer Order Summary view tracks customerId → { totalOrders, totalSpent }, incrementing on OrderCreated and decrementing on OrderCancelled.

    Enriched View — denormalization across services, as we just covered. Order events plus customer and product data produce a self-contained order view.

    Time-Series View — track changes over time. Daily inventory snapshots keyed by date + productId, updated by stock reservation and release events.

    You can maintain as many views as you need from the same event stream. They’re independent — a bug in one doesn’t affect the others, and each can be rebuilt separately.


    Handling View Staleness

    Views are updated asynchronously by the pipeline. There’s a brief window — usually measured in milliseconds — where the view might not reflect the very latest event. Three ways to deal with it:

    Accept it. For most reads, a few milliseconds of staleness is fine. Just read the view.

    Customer customer = customerView.get(customerId);
    

    Wait for it. When you need to read your own writes — like returning a newly created customer right after creating them — wait for the pipeline to complete:

    CompletableFuture<EventCompletion> future = controller.handleEvent(event, correlationId);
    future.join();
    
    // Now the view is guaranteed to reflect this event
    Customer customer = customerView.get(customerId);
    

    Check it. Include a version or timestamp and verify the view is current enough:

    long expectedVersion = event.getTimestamp().toEpochMilli();
    Customer customer = customerView.get(customerId);
    if (customer.getUpdatedAt() < expectedVersion) {
        // View not yet updated — wait or return the event data directly
    }
    

    Our framework’s handleEvent returns a CompletableFuture specifically to make the “wait for it” path easy. Most API endpoints use it.


    Performance

    Materialized view reads are IMap lookups. On a single laptop (M4 MacBook Pro, 16GB, Docker Compose):

    Operation Latency
    View read (local partition) < 0.1ms
    View read (remote partition) < 0.5ms
    View read (near cache) < 0.01ms
    View update (pipeline Stage 4) < 1ms

    For views that get read far more often than they’re written — which is most of them — near cache is worth enabling:

    hazelcast:
      map:
        Customer_VIEW:
          near-cache:
            name: customer-near-cache
            time-to-live-seconds: 30
            max-idle-seconds: 10
            eviction:
              eviction-policy: LRU
              max-size-policy: ENTRY_COUNT
              size: 10000
    

    This caches view entries locally on the client side. Reads drop to microseconds. The TTL ensures stale entries get refreshed, and LRU eviction keeps memory bounded.


    Predefined Queries vs. Ad-Hoc Access

    Everything we’ve built so far serves predefined query patterns — get customer by ID, get enriched order, look up product stock. The services expose exactly the queries their API needs, and those queries are fast because the views are designed for them.

    Sooner or later, though, someone from the business side is going to want more. “Show me all orders over $500 in the last 30 days.” “Which customers changed their address this quarter?” Hazelcast supports SQL queries against IMaps — through Management Center, the command-line client, or third-party tools like DBeaver. The capability is there.

    But I wouldn’t be eager to have my production throughput impacted by someone’s exploratory query with a broad predicate scanning thousands of entries. An analyst’s ad-hoc join returning ten thousand rows is a very different workload than a targeted key lookup, and it can absolutely affect the latency your customers see.

    If you want to give analysts this kind of access — and there are good reasons to — consider WAN replication (a Hazelcast Enterprise feature) to duplicate your IMap data into a secondary cluster dedicated to analytics. Analysts query to their heart’s content; production stays untouched. This is really just CQRS taken one step further. We already separated the write model (events) from the operational read model (views). WAN replication separates the operational read model from the analytical read model. Same principle, another level of isolation.


    Practical Advice

    Keep your views focused. A CustomerView for profile queries, a CustomerByEmailView for authentication, a CustomerOrderSummary for order history. Don’t build a CustomerEverythingView that tries to serve every possible query pattern — it’ll be expensive to update and awkward to query.

    Document what events feed each view. When someone asks “why does the order view show the wrong customer name?” you want to be able to trace it: “The order view is updated by OrderCreated events, and it reads customer data from the Customer view at enrichment time. If the customer name changed after the order was created, the order view still shows the name at order time.”

    Handle missing data gracefully. When the enriched order view looks up a customer and gets null — maybe the customer was deleted, maybe the customer event hasn’t propagated yet — don’t crash. Use a sensible default:

    GenericRecord customer = customerView.get(customerId);
    String customerName = customer != null ?
        customer.getString("name") : "Customer " + customerId;
    

    And think about rebuild cost before your event history gets large. Replaying a million events to rebuild a view takes time. Strategies include periodic snapshots (save the view state and replay only from the snapshot forward), incremental rebuilds (track the last processed event sequence), and parallel rebuilds for independent views.


    That covers the read side of CQRS — how we turn an append-only event stream into fast, queryable state. The views are disposable, rebuildable, and independent. Denormalization is a feature, not a compromise. And the whole thing runs at IMap speed because the read model lives in the same platform as the processing engine.

    Next we’ll move from a single service to many. When a business operation has to update state in two or three different services and any one of them can fail, two-phase commit is off the table and you need a different answer. That’s the saga pattern.


    Next up: The Saga Pattern for Distributed Transactions

    Previous: Building the Event Pipeline with Hazelcast Jet

  • Hazelcast Jet Event Pipelines: A Six-Stage Walkthrough

    Part 2 in the “Building Event-Driven Microservices with Hazelcast” series


    Introduction

    In Part 1 we talked about event sourcing — what it is, why it matters, and how events flow through our framework. What we didn’t talk much about is the thing that actually makes the events flow: the Hazelcast Jet pipeline sitting in the middle of everything.

    Jet is Hazelcast’s stream processing engine, and it’s doing all the heavy lifting here. It reads events as they arrive, pushes them through six processing stages — persist, update view, publish, and so on — and it does this with backpressure handling and sub-millisecond latency because everything stays in-memory. No external message broker. No separate stream processor. Jet is embedded right in Hazelcast, which means your event processing pipeline has direct, local access to the same IMaps that store your events and views.

    Let’s walk through how it works.


    The 6-Stage Pipeline

    Six-stage Hazelcast Jet pipeline

    Six stages, each with one task. An event enters on the left and comes out the other side persisted, applied to a view, published to subscribers, and marked complete. If any stage fails, the context carries that information forward so downstream stages can react (usually by skipping).


    Stage 1: Source — Reading from Event Journal

    The pipeline reads events from the pending events IMap, but not by polling it. That would be terrible — you’d burn CPU checking for new entries, you’d introduce latency, and you’d probably miss things under load. Instead, we use the Event Journal.

    The Event Journal is a ring buffer that Hazelcast maintains on each IMap. Every write to the map is recorded in the journal, and Jet can stream directly from it. You configure it on the map:

    Config config = new Config();
    config.getMapConfig("Customer_PENDING")
        .getEventJournalConfig()
        .setEnabled(true)
        .setCapacity(10000);
    

    And then the pipeline reads from it:

    Pipeline pipeline = Pipeline.create();
    
    StreamStage<Map.Entry<PartitionedSequenceKey<K>, GenericRecord>> source = pipeline
        .readFrom(Sources.<PartitionedSequenceKey<K>, GenericRecord>mapJournal(
            pendingMapName,
            JournalInitialPosition.START_FROM_OLDEST
        ))
        .withIngestionTimestamps()
        .setName("1-source-pending-events");
    

    START_FROM_OLDEST means after a restart the pipeline picks up from the beginning of the journal — no events lost. withIngestionTimestamps() stamps each event as it enters the pipeline, which we use later for latency tracking.

    PartitionedSequenceKey: Why the Map Key Isn’t a Simple String

    You probably noticed the key type is PartitionedSequenceKey<K> rather than, say, a customer ID. There’s a reason for that, and it took us a bit to figure out we needed it.

    Jet guarantees that events within the same Hazelcast partition are processed in order. Events across different partitions? No ordering guarantee. They can arrive in any order.

    That’s fine as long as all events for a given entity share the same key. But consider this: you create an account keyed by customer ID, then immediately deposit funds into it keyed by transaction ID. Those two keys might hash to different partitions. Jet could process the deposit before the account exists. The deposit fails. The customer calls support. Nobody’s having a good day.

    PartitionedSequenceKey separates two concerns that are easy to conflate — the event’s unique identity and the partition it should land on:

    public class PartitionedSequenceKey<K>
            implements PartitionAware<K>, Comparable<PartitionedSequenceKey<K>>,
                       Serializable {
    
        private final long sequence;     // Globally unique FlakeId for ordering
        private final K partitionKey;    // Domain object ID for co-location
    
        @Override
        public K getPartitionKey() {
            return partitionKey;  // Hazelcast routes by this, not the full key
        }
    }
    

    The sequence comes from Hazelcast’s FlakeIdGenerator — globally unique, monotonically increasing. The partitionKey is the domain object’s ID (customer ID, not transaction ID). Because the key implements PartitionAware, Hazelcast uses getPartitionKey() for partition assignment instead of hashing the whole composite key. So the account creation and the deposit both route to the customer’s partition, and Jet processes them in sequence order.

    Why This Is Lock-Free

    There’s something worth understanding about why partition-aware routing gives us ordered, atomic processing — and it’s not locking.

    Hazelcast distributes data across 271 partitions (by default), and each partition is owned by exactly one thread — the partition thread. All operations on keys in that partition are executed by that single thread, sequentially. There’s no concurrent access to fight over, so there’s no need for locks, CAS operations, or synchronized blocks. Two events for customer-123 arrive a millisecond apart? They both route to the same partition, and the partition thread processes them one after the other. Ordering and atomicity come from the threading architecture itself.

    This carries through to the materialized view layer too. When we call executeOnKey with an EntryProcessor in Part 3, that processor runs on the partition thread for that key. It reads the current state, applies the event, writes the new state — all in one shot, on one thread, with no possibility of a conflicting update sneaking in between. It’s atomic by construction, not by locking.

    The practical payoff is concurrency without contention. Thousands of events per second can flow through the pipeline, and as long as they’re for different entities, they’re processed in parallel across different partition threads. Events for the same entity are serialized — but by the architecture, not by a bottleneck. You get high throughput and strict per-entity ordering at the same time, and you didn’t write a single line of synchronization code to get it.


    Stage 2: Enrich — Adding Metadata

    Before we do anything with the event, we stamp it with pipeline entry time and extract identification fields. This gives us latency tracking from the moment the event enters the pipeline, not just from when it was created.

    StreamStage<EventContext<K>> enriched = source
        .map(EventSourcingPipeline::enrichEvent)
        .setName("2-enrich-metadata");
    
    private static <K> EventContext<K> enrichEvent(
            Map.Entry<PartitionedSequenceKey<K>, GenericRecord> entry) {
    
        Instant pipelineEntryTime = Instant.now();
        PartitionedSequenceKey<K> key = entry.getKey();
        GenericRecord eventRecord = entry.getValue();
    
        String eventType = extractEventType(eventRecord);
        String eventId = extractEventId(eventRecord);
    
        return new EventContext<>(key, eventRecord, eventType, eventId, pipelineEntryTime);
    }
    

    (Note the static method — that’s not accidental. We’ll come back to why in the architecture section.)

    The EventContext is the envelope that carries the event through all remaining stages:

    private static class EventContext<K> implements Serializable {
    
        final PartitionedSequenceKey<K> key;
        final GenericRecord eventRecord;
        final String eventType;
        final String eventId;
        final Instant pipelineEntryTime;
    
        // Stage completion flags
        boolean persisted;
        boolean viewUpdated;
        boolean published;
    
        // Stage timing
        Instant persistTime;
        Instant viewUpdateTime;
        Instant publishTime;
    }
    

    Each stage sets its own flag and timestamp. If persist fails, persisted stays false, and the view update stage sees that and skips. No event gets partially applied without the context knowing about it.


    Stage 3: Persist — Writing to the Event Store

    This is the durability step. Once an event is in the event store, it’s the permanent record of what happened. Everything else — views, notifications, completions — can be rebuilt from events. This one can’t.

    EventStoreServiceCreator<D, K, E> eventStoreCreator =
        new EventStoreServiceCreator<>(domainName);
    
    ServiceFactory<?, HazelcastEventStore<D, K, E>> eventStoreFactory =
        ServiceFactories.<HazelcastEventStore<D, K, E>>sharedService(eventStoreCreator)
            .toNonCooperative();
    
    StreamStage<EventContext<K>> persisted = enriched
        .mapUsingService(eventStoreFactory, (store, ctx) -> {
            try {
                Instant start = Instant.now();
                store.append(ctx.key, ctx.eventRecord);
                return ctx.withPersisted(true, start);
            } catch (Exception e) {
                return ctx.withPersisted(false, Instant.now());
            }
        })
        .setName("3-persist-event-store");
    

    The ServiceFactory Pattern

    Why the ServiceFactory indirection instead of just passing in an EventStore reference? Because Jet pipelines are distributed. The pipeline definition gets serialized and shipped to whatever nodes Jet decides to run it on. If your lambda captures a reference to a Spring-managed EventStore bean — which holds a HazelcastInstance, database connections, and who knows what else — that serialization is going to fail spectacularly at runtime.

    ServiceFactory solves this: you give Jet a recipe for creating the service, and it creates an instance on each node where the pipeline actually runs.

    public class EventStoreServiceCreator<D extends DomainObject<K>, K,
            E extends DomainEvent<D, K>>
            implements FunctionEx<ProcessorSupplier.Context, HazelcastEventStore<D, K, E>> {
    
        private final String domainName;
    
        @Override
        public HazelcastEventStore<D, K, E> applyEx(ProcessorSupplier.Context ctx) {
            HazelcastInstance hz = ctx.hazelcastInstance();
            return new HazelcastEventStore<>(hz, domainName);
        }
    }
    

    The creator itself is just a domain name string — easily serializable. The heavy objects get created on the target node using that node’s local HazelcastInstance.


    Stage 4: Update View — Applying to Materialized View

    With the event persisted, we update the materialized view. This is where the event’s apply logic runs — transforming the event into current state.

    ViewUpdaterServiceCreator<K> serviceCreator =
        new ViewUpdaterServiceCreator<>(domainName, viewUpdaterClass);
    
    ServiceFactory<?, ViewUpdater<K>> viewUpdaterServiceFactory =
        ServiceFactories.<ViewUpdater<K>>sharedService(serviceCreator)
            .toNonCooperative();
    
    StreamStage<EventContext<K>> viewUpdated = persisted
        .mapUsingService(viewUpdaterServiceFactory, (viewUpdater, ctx) -> {
            if (!ctx.persisted) {
                return ctx;  // Skip if persist failed
            }
            try {
                Instant start = Instant.now();
                viewUpdater.updateDirect(ctx.eventRecord);
                return ctx.withViewUpdated(true, start);
            } catch (Exception e) {
                logger.warn("Failed to update view for event: {}", ctx.eventId, e);
                return ctx.withViewUpdated(false, Instant.now());
            }
        })
        .setName("4-update-materialized-view");
    

    The ViewUpdater is an abstract class — each domain implements its own:

    public abstract class ViewUpdater<K> implements Serializable {
    
        protected final transient HazelcastViewStore<K> viewStore;
    
        protected abstract K extractKey(GenericRecord eventRecord);
    
        protected abstract GenericRecord applyEvent(
            GenericRecord eventRecord,
            GenericRecord currentState);
    
        public GenericRecord updateDirect(GenericRecord eventRecord) {
            K key = extractKey(eventRecord);
            GenericRecord currentState = viewStore.get(key).orElse(null);
            GenericRecord updatedState = applyEvent(eventRecord, currentState);
    
            if (updatedState != null) {
                viewStore.put(key, updatedState);
            } else if (currentState != null) {
                viewStore.remove(key);  // Deletion event
            }
    
            return updatedState;
        }
    }
    

    And here’s what a concrete one looks like for customers:

    public class CustomerViewUpdater extends ViewUpdater<String> {
    
        @Override
        protected String extractKey(GenericRecord eventRecord) {
            return eventRecord.getString("key");
        }
    
        @Override
        protected GenericRecord applyEvent(GenericRecord event, GenericRecord current) {
            String eventType = event.getString("eventType");
    
            return switch (eventType) {
                case "CustomerCreated" -> createCustomerView(event);
                case "CustomerUpdated" -> updateCustomerView(event, current);
                case "CustomerStatusChanged" -> changeStatus(event, current);
                case "CustomerDeleted" -> null;  // Return null to delete
                default -> current;
            };
        }
    
        private GenericRecord createCustomerView(GenericRecord event) {
            return GenericRecordBuilder.compact("Customer")
                .setString("customerId", event.getString("key"))
                .setString("email", event.getString("email"))
                .setString("name", event.getString("name"))
                .setString("address", event.getString("address"))
                .setString("status", "ACTIVE")
                .setInt64("createdAt", Instant.now().toEpochMilli())
                .build();
        }
    }
    

    Returning null from applyEvent is the deletion convention — the updater removes the entry from the view. Everything else either creates or modifies the view record.


    Stage 5: Publish — Notifying Other Services via ITopic

    Once the view is updated, we publish the event so other services can react. This is where cross-service communication happens. Saga listeners subscribe to topics like “OrderCreated” or “StockReserved” and kick off their own workflows when those events arrive.

    EventTopicPublisherServiceCreator publisherCreator = new EventTopicPublisherServiceCreator();
    ServiceFactory<?, EventTopicPublisherServiceCreator.TopicPublisher> publisherFactory =
        ServiceFactories.<EventTopicPublisherServiceCreator.TopicPublisher>sharedService(publisherCreator)
            .toNonCooperative();
    
    StreamStage<EventContext<K>> published = viewUpdated
        .mapUsingService(publisherFactory, (publisher, ctx) -> {
            if (!ctx.viewUpdated) {
                return ctx;
            }
            try {
                publisher.publish(ctx.eventType, ctx.eventRecord);
                return ctx.withPublished(true, Instant.now());
            } catch (Exception e) {
                logger.warn("Failed to publish event {} to topic: {}",
                    ctx.eventId, ctx.eventType, e);
                return ctx.withPublished(false, Instant.now());
            }
        })
        .setName("5-publish-to-subscribers");
    

    The TopicPublisher is thin — it resolves the ITopic by event type name and publishes:

    public class EventTopicPublisherServiceCreator
            implements FunctionEx<ProcessorSupplier.Context, TopicPublisher> {
    
        @Override
        public TopicPublisher applyEx(ProcessorSupplier.Context ctx) {
            return new TopicPublisher(ctx.hazelcastInstance());
        }
    
        public static class TopicPublisher {
            private final HazelcastInstance hazelcast;
    
            public void publish(String topicName, GenericRecord record) {
                ITopic<GenericRecord> topic = hazelcast.getTopic(topicName);
                topic.publish(record);
            }
        }
    }
    

    Each event type gets its own ITopic. The Order service subscribes to StockReserved and PaymentProcessed; the Inventory service subscribes to OrderPlaced. Nobody hears events they can’t act on.


    Stage 6: Complete — Signaling Completion (and Smuggling Out Metrics)

    The last stage writes a completion record so the calling code knows the event made it through. But there’s a twist.

    Jet pipelines run inside Hazelcast’s execution engine. They don’t have access to Spring’s MeterRegistry, or any external metrics system. The pipeline can time every stage internally — it does, that’s what all those Instant.now() calls are for — but it has no way to report those timings to Prometheus or Grafana directly.

    So we cheat. The completion record carries the timestamps out:

    published.map(ctx -> {
        long completionMs = Instant.now().toEpochMilli();
        GenericRecord completion = GenericRecordBuilder.compact("PipelineCompletion")
                .setString("eventId", ctx.eventId)
                .setString("eventType", ctx.eventType)
                .setBoolean("persisted", ctx.persisted)
                .setBoolean("viewUpdated", ctx.viewUpdated)
                .setBoolean("published", ctx.published)
                .setInt64("pipelineEntryMs", ctx.pipelineEntryTime.toEpochMilli())
                .setInt64("persistStartMs",
                    ctx.persistTime != null ? ctx.persistTime.toEpochMilli() : 0L)
                .setInt64("viewUpdateStartMs",
                    ctx.viewUpdateTime != null ? ctx.viewUpdateTime.toEpochMilli() : 0L)
                .setInt64("publishTimeMs",
                    ctx.publishTime != null ? ctx.publishTime.toEpochMilli() : 0L)
                .setInt64("completionMs", completionMs)
                .build();
        return Map.entry(ctx.eventId, completion);
    })
    .writeTo(Sinks.map(completionsMapName))
    .setName("6-signal-completion");
    

    The controller picks up the completion, extracts the timestamps, and reports them to Micrometer. The pipeline instruments itself; the controller publishes the metrics. It also gets the stage success flags — persisted, viewUpdated, published — so it knows whether the event made it through clean or had partial failures along the way.

    The controller side looks like this:

    public CompletableFuture<EventCompletion> handleEvent(DomainEvent event, UUID correlationId) {
        pendingEventsMap.set(key, eventRecord);
    
        return CompletableFuture.supplyAsync(() -> {
            int maxWait = 5000;
            int elapsed = 0;
    
            while (elapsed < maxWait) {
                GenericRecord completion = completionsMap.get(key);
                if (completion != null) {
                    return EventCompletion.success(eventId);
                }
                Thread.sleep(10);
                elapsed += 10;
            }
            throw new TimeoutException("Event processing timeout");
        });
    }
    

    Write the event to the pending map, then poll the completions map until the pipeline writes the result. Five seconds is plenty — the pipeline usually finishes in under a millisecond.


    Jet Pipeline Traps (and How We Avoid Them)

    If you’re used to writing normal Java code, Jet pipelines will surprise you in a few unpleasant ways. These are the ones that cost us the most time.

    The Serialization Trap

    Every lambda in the pipeline must be serializable, because Jet ships the pipeline definition to whichever nodes it decides to run on. If your lambda captures this, and this has a HazelcastInstance field, or a Spring ApplicationContext, or anything else that doesn’t serialize — you get a NotSerializableException at runtime. Not at compile time. At runtime. In production, ideally on a Friday.

    Three rules keep you out of trouble:

    Use static methods as pipeline stages (they don’t capture this). Use ServiceFactory for anything that needs a Hazelcast instance or other heavy resource. And if you must capture a value, make sure it’s a final local variable that’s actually serializable.

    // This will ruin your weekend
    .map(event -> this.eventStore.append(event))
    
    // This will not
    .mapUsingService(eventStoreFactory, (store, event) -> store.append(event))
    

    The Try-Catch That Catches Nothing

    This one’s sneaky. Your instinct when writing the pipeline is to wrap the whole thing in a try-catch:

    try {
        Pipeline pipeline = Pipeline.create();
        // ... define all six stages ...
        hazelcast.getJet().newJob(pipeline, jobConfig);
    } catch (Exception e) {
        logger.error("Pipeline failed!", e);  // This catches almost nothing useful
    }
    

    That try-catch protects you during pipeline assembly — if you misspell a map name or pass an invalid config, you’ll catch it. But assembly isn’t where things go wrong. Things go wrong during execution, and by that point your code is somewhere else entirely.

    When you call newJob(), Jet takes your pipeline definition, converts each stage into a processor, and distributes those processors across the cluster. The actual event processing happens inside those distributed processors, on whichever nodes Jet assigned them to. You’re not just off the end of that try-catch block — you might not even be on the same machine.

    That’s why every stage in our pipeline has its own try-catch inside the lambda:

    .mapUsingService(eventStoreFactory, (store, ctx) -> {
        try {
            store.append(ctx.key, ctx.eventRecord);
            return ctx.withPersisted(true, Instant.now());
        } catch (Exception e) {
            logger.error("Persist failed for event {}", ctx.eventId, e);
            return ctx.withPersisted(false, Instant.now());
        }
    })
    

    The error handling has to live where the code actually runs — inside each processor, on whatever node Jet put it on. Downstream stages check the context flags before doing work. If persist failed, the view update skips. If the view update failed, publish skips. The completion record captures exactly what happened, so you can diagnose partial failures from the metrics.

    Cooperative, Non-Cooperative, Shared, and Non-Shared

    Jet services have two dimensions you need to get right, and they’re independent of each other.

    Cooperative vs. non-cooperative is about blocking. Jet’s default threading model is cooperative — stages share a thread pool and are expected to yield quickly. If a stage does I/O (writes to the event store, makes a network call, publishes to ITopic), it blocks the thread. Mark it non-cooperative so Jet gives it a dedicated thread:

    ServiceFactory<?, EventStore> factory = ServiceFactories
        .sharedService(creator)
        .toNonCooperative();
    

    Forget this and your pipeline stalls under load. The cooperative thread pool gets monopolized by I/O-bound stages that never yield, and throughput craters.

    Shared vs. non-shared is about thread safety. A shared service creates one instance per node and hands it to all processors on that node — multiple threads will call it concurrently. A non-shared service creates a separate instance for each processor, so no concurrent access.

    Our EventStore and TopicPublisher are backed by Hazelcast data structures (IMap, ITopic), which are thread-safe, so they’re shared. But if you had a service that accumulated state in a local buffer, or maintained a non-thread-safe connection like a raw JDBC Connection, you’d mark it non-shared:

    // Thread-safe service — one instance shared across processors on this node
    ServiceFactories.sharedService(creator).toNonCooperative();
    
    // NOT thread-safe — each processor gets its own instance
    ServiceFactories.nonSharedService(creator).toNonCooperative();
    

    Get the combination wrong and you get either unnecessary memory overhead (non-shared when shared would do) or, worse, data corruption from concurrent access to a non-thread-safe service (shared when it shouldn’t be). That second one is the kind of bug that shows up intermittently under load and makes you question your career choices.


    Lifecycle Management

    Starting and stopping the pipeline:

    public class EventSourcingPipeline<D, K, E> {
    
        private volatile Job pipelineJob;
    
        public Job start() {
            if (pipelineJob != null && !pipelineJob.isUserCancelled()) {
                logger.warn("Pipeline already running");
                return pipelineJob;
            }
    
            Pipeline pipeline = buildPipeline();
            JobConfig jobConfig = new JobConfig()
                .setName(domainName + "-EventSourcingPipeline");
    
            pipelineJob = hazelcast.getJet().newJob(pipeline, jobConfig);
            logger.info("Started pipeline for {}, jobId: {}",
                domainName, pipelineJob.getId());
            return pipelineJob;
        }
    
        public void stop() {
            if (pipelineJob != null) {
                pipelineJob.cancel();
                logger.info("Stopped pipeline for {}", domainName);
            }
        }
    }
    

    Each domain (Customer, Order, Inventory) gets its own pipeline instance running as a Jet job. They’re independent — you can restart the Order pipeline without affecting Customers.


    Performance

    All of this runs on a single Apple MacBook Pro (M4, 16GB) with three Hazelcast instances in Docker Compose:

    MetricValue
    Throughput100,000+ events/sec
    P50 Latency< 0.3ms
    P99 Latency< 1ms
    P99.9 Latency< 5ms

    That’s the materialized view layer — events flowing through the pipeline and out the other side. It’s fast because everything is in-memory, Jet parallelizes across partitions, the Event Journal provides non-blocking streaming, and views and events live on the same cluster (no network hop for the view update).

    We verify it with a load test that submits 100K events and waits for all completions:

    // Load test
    @Test
    void loadTest_100KEvents() {
        int eventCount = 100_000;
        List<CompletableFuture<?>> futures = new ArrayList<>();
        long start = System.nanoTime();
    
        for (int i = 0; i < eventCount; i++) {
            CustomerCreatedEvent event = createTestEvent(i);
            futures.add(controller.handleEvent(event, UUID.randomUUID()));
        }
    
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
    
        long duration = System.nanoTime() - start;
        double tps = eventCount / (duration / 1_000_000_000.0);
        System.out.printf("Processed %d events in %dms (%.0f TPS)%n",
            eventCount, duration / 1_000_000, tps);
    }
    

    Monitoring

    The pipeline metrics — events processed, processing time histograms, pending event counts — feed into Micrometer and from there to Prometheus and Grafana. We’ll dig into the observability setup in a later post, but here’s the shape of it:

    public class PipelineMetrics {
    
        private final Counter eventsProcessed;
        private final Timer eventProcessingTime;
    
        public PipelineMetrics(MeterRegistry registry, String domainName) {
            this.eventsProcessed = Counter.builder("events.processed")
                .tag("domain", domainName)
                .register(registry);
    
            this.eventProcessingTime = Timer.builder("events.processing.time")
                .tag("domain", domainName)
                .register(registry);
        }
    }
    

    Remember how Stage 6 smuggles timing data out in the completion record? This is where it ends up — the controller reads the timestamps, calculates durations, and records them here.


    That’s the pipeline. Six stages, each with one task, connected by an EventContext that carries the event and its processing history all the way through. The design decisions that matter most: PartitionedSequenceKey for ordering, ServiceFactory for distributed service access, static methods to dodge serialization traps, and a completion record that doubles as a metrics transport.

    Next up we’ll look at what happens after Stage 4 in more detail — how materialized views are designed, queried, and rebuilt.


    Next up: Materialized Views for Fast Queries

    Previous: Event Sourcing with Hazelcast: A Practical Introduction