Background

If you have been using Angular for some time, you must know that many core functionalities are coupled with RxJS. Whether this is good or not is the subject of another post. Yet, it is clearly inconvenient when some other core feature does not play well with RxJS.

One of the most notable examples is @Input. It is arguably one of the fundamental parts of the framework. Still, there is no way to turn these properties into observables. There was a GitHub issue open for almost six years about this. As always, many solutions have turned up aiming to solve the problem with existing tools.

As I got more annoyed by the lack of the feature, I did what everyone else would do: I started looking for the best available 3rd-party solution. Unfortunately, most of these were at most half-baked or outdated; lacked template type-checking support, had inconvenient APIs, contained memory leaks, etc. The one I ended up using came from one of the last comments, called neo-observable-input. Although it carried some boilerplate, the implementation was elegant, and it worked.

Comment of Paul Draper in the GitHub issue

However, as time went on, the boilerplate started bothering me. Having to define a property to instantiate ObservableInputs, and a property for each “reactified” input (even when used a single time), and adding ngOnChanges was just too much. There must be a cleaner solution. Afterward, I spent some time tinkering with new approaches and ended up with something fairly complicated. It was hooking to Angular lifecycle hooks, something much harder than one would think.

Almost getting it right

Once I implemented my solution, I accidentally stumbled upon the package observable-from-input. Seeing the relatively simple implementation, I was shocked. After inspecting the code, I realized it had a serious flaw (besides having an unnecessarily ugly API). The following StackBlitz demonstrates the problem (type something into the input field and look at the console).

If we look into the code, we can see that the problem is that whenever fromInput is called, the property is redefined. Although the new definition correctly handles the “original” behavior (setting the value and returning it through the getter), it does not emit anything for the previous observable. As a result, when calling it multiple times within the same component, only the last definition remains in place, and only the last observable returned emits values.

import {Observable, ReplaySubject} from 'rxjs';

export const fromInput = <T>(target: T) => <K extends keyof T>(name: K): Observable<T[K]> => {
    let current: T[K];
    const subject = new ReplaySubject<T[K]>(1);

    if (target[name] !== undefined) {
        subject.next(target[name] as any);
    }

    // This setter will be overwritten by the next 'fromInput' call, and therefore
    // 'subject' will never emit.
    Object.defineProperty(target, name, {
        set(value: T[K]) {
            subject.next(value);
            current = value;
        },
        get() {
            return current;
        },
    });

    return subject.asObservable();
};

Improving it

Fortunately, there is an easy solution to fix this: caching. If the problem is constantly redefining (and therefore overwriting) the property, the easiest solution is never overwriting it. If fromInput is called with the same property, we could return the original observable without creating a new one.

There are multiple ways to implement this. The first approach is defining a “secret” property on the component, where we store all the observables. Whenever there is already a property with the given value, we return the “cached” observable.

import {Observable, ReplaySubject} from 'rxjs';

const SECRET_PROPERTY_NAME = '__secretFromInputCache__';

export const fromInput = <T>(target: T) => <K extends keyof T>(name: K): Observable<T[K]> => {
    // Check whether a subject was already created for given 'name'. If yes, return early.
    const alreadyCachedSubject = target[SECRET_PROPERTY_NAME]?.[name];
    if (!!alreadyCachedSubject) {
        return alreadyCachedSubject.asObservable();
    }

    // Initialize a new subject if this is the first time.
    let current: T[K];
    const subject = new ReplaySubject<T[K]>(1);

    // Update (or create) the cache of the component to include the newly created subject
    Object.assign(target, {
        [SECRET_PROPERTY_NAME]: {...target[SECRET_PROPERTY_NAME], [name]: subject},
    });

    if (target[name] !== undefined) {
        subject.next(target[name] as any);
    }

    // Now the property is only defined if it hadn't been before.
    Object.defineProperty(target, name, {
        set(value: T[K]) {
            subject.next(value);
            current = value;
        },
        get() {
            return current;
        },
    });

    return subject.asObservable();
};

Although it works fine, it (hopefully) still does not feel right to have these “secret properties” lying around. It feels hacky. Instead, we could define a map outside the function’s scope, where we could store our cache. It works the same way as the previous approach, but this time the cache object is not stored in the component object itself.

import {Observable, ReplaySubject} from 'rxjs';

// Create an entry for each component, with the component instance as the key, and
// the same cache object as the value.
// For now, the typing is not interesting.
const FROM_INPUT_CACHE = new Map<unknown, Record<unknown, Observable<unknown>>>();

export const fromInput = <T>(target: T) => <K extends keyof T>(name: K): Observable<T[K]> => {
    // Instead of checking the "secret property" of the component, check the "global" map cache.
    const alreadyCachedSubject = FROM_INPUT_CACHE.get(target)?.[name];
    if (!!alreadyCachedSubject) {
        return alreadyCachedSubject.asObservable();
    }

    // Initialize a new subject if this is the first time, just as before.
    let current: T[K];
    const subject = new ReplaySubject<T[K]>(1);

    // Instead of updating the component instance's cache, update the global cache.
    FROM_INPUT_CACHE.set(target, {...FROM_INPUT_CACHE.get(target), [name]: subject});

    if (target[name] !== undefined) {
        subject.next(target[name] as any);
    }

    Object.defineProperty(target, name, {
        set(value: T[K]) {
            subject.next(value);
            current = value;
        },
        get() {
            return current;
        },
    });

    return subject.asObservable();
};

One with more experience can easily catch the problem here: garbage collection. Here is a good description of how it works if you are unfamiliar with it. By using a traditional Map, we keep a reference to the component instance, which prevents it from being garbage collected. Over time, this can cause performance issues. Fortunately (again) , WeakMap is here to save us. The main difference between WeakMap and a traditional Map is that WeakMap does not prevent its object keys (and therefore its values) from being garbage collected when the time comes. Hence, it is safe to use the component instance as the key. Once Angular destroys it, it gets garbage collected, together with all the cached observables.

import {Observable, ReplaySubject} from 'rxjs';

// The only difference compared to the previous example: WeakMap instead of Map.
const FROM_INPUT_CACHE = new WeakMap<unknown, Record<unknown, Observable<unknown>>>();

export const fromInput = <T>(target: T) => <K extends keyof T>(name: K): Observable<T[K]> => {
    const alreadyCachedSubject = FROM_INPUT_CACHE.get(target)?.[name];
    if (!!alreadyCachedSubject) {
        return alreadyCachedSubject.asObservable();
    }

    let current: T[K];
    const subject = new ReplaySubject<T[K]>(1);

    FROM_INPUT_CACHE.set(target, {...FROM_INPUT_CACHE.get(target), [name]: subject});

    if (target[name] !== undefined) {
        subject.next(target[name] as any);
    }

    Object.defineProperty(target, name, {
        set(value: T[K]) {
            subject.next(value);
            current = value;
        },
        get() {
            return current;
        },
    });

    return subject.asObservable();
};

Final solution

On top of introducing the caching, the API is also a bit friendlier compared to observable-from-input. There is no need to call the return of a function (with the double brackets), and there is no need to pass in a generic type either. Additionally, there is a runtime error if the selected property name is not an input. Here is the final StackBlitz:

Limitations

Although I would consider this approach neat, there are still some limitations. For example, the input properties have to be public for the typing to work (a constraint from TypeScript). Also, if the property is re-initialized for some reason (e.g. by using some custom decorator), it will stop working, as we saw in the first example. These are not major issues, but it is better to be aware of these limitations.

Conclusion

If you managed to break this implementation or found a memory leak, please shoot me an email. I am excited to polish the solution further. Also, I am planning on releasing the implementation as a separate npm package, but until then, you can easily copy the single source file.