Combine.Publisher.flatMap memory leak when publisher fails

Originator:slammer0
Number:rdar://FB9860241 Date Originated:Jan 27, 2022 at 11:16 AM
Status:Open Resolved:
Product:Something else not on this list Product Version:
Classification:Incorrect/Unexpected Behavior Reproducible:yes
 
Please describe the issue:
When flatMap operator is used and failing publisher is returned from transform block, it never cancels upstream and blocks any cancelling efforts causing memory leak.

Please list the steps you took to reproduce the issue:
publisher.flatMap { _ in
    Fail(error: MyError.error)
}
That's all but please find attached swift with example that can be executed

What did you expect to happen?
subscription to publisher gets cancelled
please see example in attached file where I wrote draft of fixed flatMap by returning Result and then using tryMap outside the transform block

What actually happened?
subscription to publisher is still active, calling cancel on stream cancellable has no effect, there is no way to cancel the subscription

Attached file:

CombineFlatMapIssue.swift
https://gist.github.com/piotrtobolski/b6fa065194935f2d083e96375e3ca432

import Combine

enum MyError: Error {
    case error
}

let subject1 = PassthroughSubject<Int, Error>()

let cancellable1 = subject1
    .print("before flatMap")
    .flatMap { _ in
        Fail<Int, Error>(error: MyError.error)
    }
    .print("after flatMap")
    .sink {
        print("completion: \($0)")
    } receiveValue: {
        print("receiveValue: \($0)")
    }

print("Sending 1")
subject1.send(1)
print("Sending 2")
subject1.send(2)
print("Cancelling")
cancellable1.cancel()
print("Sending 3")
subject1.send(3)

extension Publisher {
    func flatMap2<T, P>(
        maxPublishers: Subscribers.Demand = .unlimited,
        _ transform: @escaping (Self.Output) -> P
    ) -> AnyPublisher<T, Failure>
    where T == P.Output, P : Publisher, Self.Failure == P.Failure {
        flatMap(maxPublishers: maxPublishers) {
            transform($0)
                .map { value -> Result<T, Failure> in
                    Result.success(value)
                }
                .catch {
                    return Just<Result<T, Failure>>(Result.failure($0))
                }
        }
        .tryMap { result in
            switch result {
            case let .success(value):
                return value
            case let .failure(error):
                throw error
            }
        }
        .mapError { error in
            error as! Failure
        }
        .eraseToAnyPublisher()
    }
}

print("-----------------")

let subject2 = PassthroughSubject<Int, Error>()

let cancellable2 = subject2
    .print("before flatMap")
    .flatMap2 { _ in
        Fail<Int, Error>(error: MyError.error)
    }
    .print("after flatMap")
    .sink {
        print("completion: \($0)")
    } receiveValue: {
        print("receiveValue: \($0)")
    }

print("Sending 1")
subject2.send(1)
print("Sending 2")
subject2.send(2)
print("Cancelling")
cancellable2.cancel()
print("Sending 3")
subject2.send(3)

/*
output:
after flatMap: receive subscription: (FlatMap)
after flatMap: request unlimited
before flatMap: receive subscription: (PassthroughSubject)
before flatMap: request unlimited
Sending 1
before flatMap: receive value: (1)
after flatMap: receive error: (error)
completion: failure(__lldb_expr_13.MyError.error)
Sending 2
before flatMap: receive value: (2)
Cancelling
Sending 3
before flatMap: receive value: (3)
-----------------
after flatMap: receive subscription: (TryMap)
after flatMap: request unlimited
before flatMap: receive subscription: (PassthroughSubject)
before flatMap: request unlimited
Sending 1
before flatMap: receive value: (1)
before flatMap: receive cancel
after flatMap: receive error: (error)
completion: failure(__lldb_expr_13.MyError.error)
Sending 2
Cancelling
Sending 3
*/

Comments


Please note: Reports posted here will not necessarily be seen by Apple. All problems should be submitted at bugreport.apple.com before they are posted here. Please only post information for Radars that you have filed yourself, and please do not include Apple confidential information in your posts. Thank you!