RxJS in Action: Creating a Robust Console Log Upload Service with Priority Handling

Last updated: July 25, 2025Views:
The article details building a robust log upload service with RxJS, emphasizing priority-based queuing. It captures console logs (log, error, warn), batches them for efficient uploads, and prioritizes errors over warnings. Key features include single active uploads, immediate error handling, delayed warning uploads, and queue management to prevent data loss during network issues. The implementation intercepts console methods, uses local storage for log management, and leverages RxJS operators like switchMap and debounceTime. Advanced logic ensures error priority, buffered warnings, and smooth state transitions, offering developers an efficient and reliable logging solution.

Recently, I needed to implement a sophisticated log upload system with priority-based queuing using RxJS. The challenge was to create a service that captures console output from developer tools and uploads it to a remote server with intelligent batching and priority handling.

The Challenge

Our application needed a robust logging system that could:

  • Capture all console activities (console.log, console.error, console.warn)
  • Upload logs efficiently without overwhelming the server
  • Prioritize critical errors over routine warnings

System Requirements

After analyzing the requirements, I identified six key behaviors the system needed to implement:

  1. Batch Processing: Each upload should gather all logs stored locally up to that point. Once successful, the local storage gets cleared to prevent memory bloat.

  2. Single Active Upload: Only one upload request can be active at a time. New requests should cancel ongoing ones, but no logs should be lost since canceled request data remains in storage for the next upload.

  3. High Priority Errors: Logs triggered by console.error are treated as high priority and must be uploaded immediately.

  4. Delayed Warnings: Logs triggered by console.warn are low priority and should be delayed by one minute before uploading. If multiple warnings occur within that timeframe, only the latest one should trigger the upload, ensuring we capture the most recent state.

  5. Priority Override: Error uploads should override and cancel any pending warning upload triggers. Since immediate error uploads already include the latest logs, separate warning uploads become redundant.

  6. Queue Management: If an error upload is in progress when a warning is triggered, the warning should wait until the error upload completes before initiating its own upload.

This design ensures efficient log handling while prioritizing critical errors over routine warnings. Let's dive into the RxJS implementation.

Implementation Overview

The solution involves three main components:

  1. Console method interception
  2. Local log storage management
  3. Priority-based upload orchestration

Let's build this step by step.

Step 1: Console Method Interception

First, we need to intercept the native console methods to capture all logging activity. This involves creating a service that overrides the default console behavior while preserving the original functionality.

typescript
Copy code
import { Injectable } from '@angular/core'; import { Observable, Subject } from 'rxjs'; export type LogEvent = { methodName: string; loggerName: string; msgs: unknown[]; }; @Injectable({ providedIn: 'root', }) export class LogReportService { private originalConsoleLog = window.console.log; private originalConsoleError = window.console.error; private originalConsoleWarn = window.console.warn; private triggerSrc = new Subject<LogEvent>(); constructor() { this.startLogging(); } get logEvents$(): Observable<LogEvent> { return this.triggerSrc.asObservable(); } startLogging(): void { this.setupConsoleOverrides(); } stopLogging(): void { this.restoreConsole(); } destroy(): void { this.stopLogging(); this.triggerSrc.complete(); } private setupConsoleOverrides(): void { window.console.log = (...args) => { this.originalConsoleLog.apply(window.console, args); this.triggerSrc.next({ methodName: 'log', loggerName: 'console', msgs: args, }); }; window.console.error = (...args) => { this.originalConsoleError.apply(window.console, args); this.triggerSrc.next({ methodName: 'error', loggerName: 'console', msgs: args, }); }; window.console.warn = (...args) => { this.originalConsoleWarn.apply(window.console, args); this.triggerSrc.next({ methodName: 'warn', loggerName: 'console', msgs: args, }); }; } private restoreConsole(): void { window.console.log = this.originalConsoleLog; window.console.error = this.originalConsoleError; window.console.warn = this.originalConsoleWarn; } }

Step 2: Local Log Storage Management

Next, we need a robust local storage system to buffer logs before uploading. For simplicity, we'll use an in-memory Map structure, but in production, you might want to consider IndexedDB for persistence across browser sessions.

The storage system needs to:

  • Generate unique IDs for each log entry
  • Store logs with timestamps
  • Provide methods to retrieve and clear the log history
typescript
Copy code
type StoredLogEntry = { id: string; timestamp: Date; logEvent: LogEvent; }; class LogStorage { private logMap = new Map<string, StoredLogEntry>(); /** * Generates a unique ID from log messages */ private generateLogId(msgs: unknown[]): string { const msgString = msgs .map(msg => (typeof msg === 'object' ? JSON.stringify(msg) : String(msg))) .join(' '); // Create a simple hash from the message string and timestamp const timestamp = Date.now(); let hash = 0; const combined = msgString + timestamp; for (let i = 0; i < combined.length; i++) { const char = combined.charCodeAt(i); hash = (hash << 5) - hash + char; hash = hash & hash; // Convert to 32-bit integer } return Math.abs(hash).toString(36) + timestamp.toString(36); } /** * Adds a log entry to the storage */ addLog(logEvent: LogEvent): string { const id = this.generateLogId(logEvent.msgs); const entry: StoredLogEntry = { id, timestamp: new Date(), logEvent, }; this.logMap.set(id, entry); return id; } /** * Gets the history of all stored logs */ getHistory(): StoredLogEntry[] { return Array.from(this.logMap.values()).sort( (a, b) => a.timestamp.getTime() - b.timestamp.getTime(), ); } /** * Clears all stored log history */ clearHistory(): void { this.logMap.clear(); } }

Step 3: Priority-Based Upload Orchestration

Now comes the most critical part: implementing the upload logic with proper priority handling and queue management. This is where RxJS really shines.

3.1 HTTP Upload Service

First, let's set up the basic upload mechanism:

typescript
Copy code
constructor(private http: HttpClient) { } private logUploader(event: LogEvent) { const log = this.logStorage.getHistory(); return this.http.post('https://jsonplaceholder.typicode.com/posts', { logs: log, trigger: event, }); }

3.2 Single Upload Constraint

To ensure only one upload is active at a time, we use RxJS's switchMap operator, which automatically cancels the previous observable when a new one is emitted:

typescript
Copy code
private uploadSrc = new Subject<() => Observable<any>>(); readonly upload$ = this.uploadSrc.asObservable(); // This ensures only one upload is active at a time this.upload$.pipe(switchMap(uploader => uploader())).subscribe();

3.3 Priority-Based Triggers

We separate console events into two priority streams:

typescript
Copy code
private triggerSrc = new Subject<LogEvent>(); private logEvents$ = this.triggerSrc.asObservable(); // High priority: immediate upload private readonly triggerError$ = this.logEvents$.pipe( filter(({ methodName }) => methodName === 'error'), ); // Low priority: debounced upload (1 minute delay) private readonly triggerWarn$ = this.logEvents$.pipe( filter(({ methodName }) => methodName === 'warn'), debounceTime(60000), // Wait 1 minute, reset timer on new warnings );

3.4 Basic Upload Subscription

Initially, we can set up simple subscriptions for both triggers:

typescript
Copy code
initUploading() { this.triggerError$ .pipe( tap(evt => { this.uploadSrc.next(() => this.logUploader(evt)); }), ) .subscribe(); this.triggerWarn$ .pipe( tap(evt => { this.uploadSrc.next(() => this.logUploader(evt)); }), ) .subscribe(); }

Step 4: Advanced Priority and Queue Management

Now we need to implement the sophisticated requirements that make this system truly robust:

Requirements Analysis

  1. Error Priority Override: Error logs must immediately cancel any pending warning uploads
  2. Warning Queue Management: Warning logs should wait if an error upload is in progress
  3. Single Upload Constraint: Only one upload active at any time to prevent network congestion
  4. State Persistence: Logs should not be lost during upload cancellations

4.1 Upload State Management

We need to track whether an upload is currently in progress:

typescript
Copy code
private withUploadBusy$ = new BehaviorSubject<boolean>(false); private latestWarnEvent$ = new BehaviorSubject<LogEvent | null>(null);

The withUploadBusy$ tracks upload state, while latestWarnEvent$ buffers the most recent warning event during busy periods.

4.2 Enhanced Upload Logic

First, let's clear logs after successful uploads:

typescript
Copy code
initUploading() { // Clear logs after successful upload this.upload$ .pipe(switchMap(uploader => uploader())) .pipe(tap(() => this.logStorage.clearHistory())) .subscribe(); }

4.3 Error Upload Priority

Error uploads get immediate priority and set the busy flag:

typescript
Copy code
// High-priority error log uploads this.triggerError$ .pipe( tap(evt => { // Mark system as busy this.withUploadBusy$.next(true); // Start upload with finalize to clear busy state this.uploadSrc.next(() => this.logUploader(evt).pipe(finalize(() => this.withUploadBusy$.next(false))), ); }), ) .subscribe();

4.4 Warning Upload Queue Management

Warnings are either uploaded immediately or buffered based on system state:

typescript
Copy code
// Buffer latest warning log event based on busy state this.triggerWarn$ .pipe(withLatestFrom(this.withUploadBusy$)) .pipe( tap(([evt, busy]) => { if (busy) { // System is busy, buffer the latest warning this.latestWarnEvent$.next(evt); } else { // System is idle, upload immediately this.uploadSrc.next(() => this.logUploader(evt)); } }), ) .subscribe();

4.5 Processing Buffered Warnings

When the system becomes idle, process any buffered warning:

typescript
Copy code
// Process buffered warnings when system becomes idle this.withUploadBusy$ .pipe(distinctUntilChanged()) .pipe(filter(busy => !busy)) .pipe(withLatestFrom(this.latestWarnEvent$)) .pipe(map(([, evt]) => evt)) .pipe(filter((evt): evt is LogEvent => !!evt)) .pipe( tap(evt => { // Clear buffer and start upload this.latestWarnEvent$.next(null); this.uploadSrc.next(() => this.logUploader(evt)); }), ) .subscribe();

This completes our advanced priority and queue management system. The final initUploading method should be:

typescript
Copy code
initUploading() { this.upload$ .pipe(switchMap(uploader => uploader())) .pipe(tap(() => this.logStorage.clearHistory())) .subscribe(); // high-priority error log uploads this.triggerError$ .pipe( tap(evt => { // mark busy this.withUploadBusy$.next(true); // push to upload$; global switchMap responsible for canceling old upload this.uploadSrc.next(() => this.logUploader(evt).pipe(finalize(() => this.withUploadBusy$.next(false))), ); }), ) .subscribe(); // buffer latest warining log event // according to busy state, decide "immediate upload" or "buffer" this.triggerWarn$ .pipe(withLatestFrom(this.withUploadBusy$)) .pipe( tap(([evt, busy]) => { if (busy) { this.latestWarnEvent$.next(evt); // only remember the latest } else { this.uploadSrc.next(() => this.logUploader(evt)); } }), ) .subscribe(); // when idle, run the buffered without-stack upload this.withUploadBusy$ .pipe(distinctUntilChanged()) .pipe(filter(busy => !busy)) .pipe(withLatestFrom(this.latestWarnEvent$)) .pipe(map(([, evt]) => evt)) .pipe(filter((evt): evt is LogEvent => !!evt)) .pipe(tap(evt => { // clear buffer this.latestWarnEvent$.next(null); // start upload; may be canceled later by a new error log event via switchMap this.uploadSrc.next(() => this.logUploader(evt)); })) .subscribe(); }


And above is our all logic that fullfills the uploading requirement. The complete code is as follows:

typescript
Copy code
import { HttpClient } from '@angular/common/http'; import { Injectable } from '@angular/core'; import { BehaviorSubject, Observable, Subject, debounceTime, distinctUntilChanged, filter, finalize, map, switchMap, tap, withLatestFrom, } from 'rxjs'; import { SubSink } from 'subsink'; export type LogEvent = { methodName: string; loggerName: string; msgs: unknown[]; }; type StoredLogEntry = { id: string; timestamp: Date; logEvent: LogEvent; }; class LogStorage { private logMap = new Map<string, StoredLogEntry>(); private generateLogId(msgs: unknown[]): string { const msgString = msgs .map(msg => (typeof msg === 'object' ? JSON.stringify(msg) : String(msg))) .join(' '); const timestamp = Date.now(); let hash = 0; const combined = msgString + timestamp; for (let i = 0; i < combined.length; i++) { const char = combined.charCodeAt(i); hash = (hash << 5) - hash + char; hash = hash & hash; // Convert to 32-bit integer } return Math.abs(hash).toString(36) + timestamp.toString(36); } addLog(logEvent: LogEvent): string { const id = this.generateLogId(logEvent.msgs); const entry: StoredLogEntry = { id, timestamp: new Date(), logEvent, }; this.logMap.set(id, entry); return id; } getHistory(): StoredLogEntry[] { return Array.from(this.logMap.values()).sort( (a, b) => a.timestamp.getTime() - b.timestamp.getTime(), ); } clearHistory(): void { this.logMap.clear(); } } @Injectable({ providedIn: 'root', }) export class LogReportService { private logStorage = new LogStorage(); private originalConsoleLog = window.console.log; private originalConsoleError = window.console.error; private originalConsoleWarn = window.console.warn; private triggerSrc = new Subject<LogEvent>(); private logEvents$ = this.triggerSrc.asObservable(); private readonly triggerError$ = this.logEvents$.pipe( filter(({ methodName }) => methodName === 'error'), ); private readonly triggerWarn$ = this.logEvents$.pipe( filter(({ methodName }) => methodName === 'warn'), // delay 1 minute before generating error report debounceTime(60 * 1000), ); private uploadSrc = new Subject<() => Observable<any>>(); readonly upload$ = this.uploadSrc.asObservable(); private readonly withUploadBusy$ = new BehaviorSubject<boolean>(false); private readonly latestWarnEvent$ = new BehaviorSubject<LogEvent | null>(null); private subs = new SubSink(); constructor(private http: HttpClient) {} init() { this.startLogging(); this.subs.sink = this.logEvents$.subscribe(event => this.logStorage.addLog(event)); this.initUploading(); } destroy(): void { // Unsubscribe from all subscriptions this.subs.unsubscribe(); // Complete all subjects this.triggerSrc.complete(); this.uploadSrc.complete(); this.withUploadBusy$.complete(); this.latestWarnEvent$.complete(); // Restore original console methods this.stopLogging(); // Clear log storage this.logStorage.clearHistory(); } private initUploading() { this.subs.sink = this.upload$ .pipe(switchMap(uploader => uploader())) .pipe(tap(() => this.logStorage.clearHistory())) .subscribe(); this.subs.sink = this.triggerError$ .pipe( tap(evt => { this.withUploadBusy$.next(true); this.uploadSrc.next(() => this.logUploader(evt).pipe(finalize(() => this.withUploadBusy$.next(false))), ); }), ) .subscribe(); this.subs.sink = this.triggerWarn$ .pipe(withLatestFrom(this.withUploadBusy$)) .pipe( tap(([evt, busy]) => { if (busy) { this.latestWarnEvent$.next(evt); } else { this.uploadSrc.next(() => this.logUploader(evt)); } }), ) .subscribe(); this.subs.sink = this.withUploadBusy$ .pipe(distinctUntilChanged()) .pipe(filter(busy => !busy)) .pipe(withLatestFrom(this.latestWarnEvent$)) .pipe(map(([, evt]) => evt)) .pipe(filter((evt): evt is LogEvent => !!evt)) .pipe( tap(evt => { this.latestWarnEvent$.next(null); this.uploadSrc.next(() => this.logUploader(evt)); }), ) .subscribe(); } private startLogging(): void { this.setupConsoleOverrides(); } private stopLogging(): void { this.restoreConsole(); } private logUploader(event: LogEvent) { const log = this.logStorage.getHistory(); return this.http.post('https://jsonplaceholder.typicode.com/posts', { logs: log, trigger: event, }); } private setupConsoleOverrides(): void { window.console.log = (...args) => { this.originalConsoleLog.apply(window.console, args); this.triggerSrc.next({ methodName: 'log', loggerName: 'console', msgs: args, }); }; window.console.error = (...args) => { this.originalConsoleError.apply(window.console, args); this.triggerSrc.next({ methodName: 'error', loggerName: 'console', msgs: args, }); }; window.console.warn = (...args) => { this.originalConsoleWarn.apply(window.console, args); this.triggerSrc.next({ methodName: 'warn', loggerName: 'console', msgs: args, }); }; } private restoreConsole(): void { window.console.log = this.originalConsoleLog; window.console.error = this.originalConsoleError; window.console.warn = this.originalConsoleWarn; } }