import * as Cause from "../Cause.js" import type * as Channel from "../Channel.js" import * as Chunk from "../Chunk.js" import * as Context from "../Context.js" import * as Deferred from "../Deferred.js" import * as Effect from "../Effect.js" import * as Either from "../Either.js" import * as Equal from "../Equal.js" import * as Exit from "../Exit.js" import * as Fiber from "../Fiber.js" import * as FiberRef from "../FiberRef.js" import { constVoid, dual, identity, pipe } from "../Function.js" import type { LazyArg } from "../Function.js" import * as Layer from "../Layer.js" import type * as MergeDecision from "../MergeDecision.js" import type * as MergeState from "../MergeState.js" import type * as MergeStrategy from "../MergeStrategy.js" import * as Option from "../Option.js" import { hasProperty, type Predicate } from "../Predicate.js" import * as PubSub from "../PubSub.js" import * as Queue from "../Queue.js" import * as Ref from "../Ref.js" import * as Scope from "../Scope.js" import type * as SingleProducerAsyncInput from "../SingleProducerAsyncInput.js" import type * as Tracer from "../Tracer.js" import type * as Types from "../Types.js" import * as executor from "./channel/channelExecutor.js" import type * as ChannelState from "./channel/channelState.js" import * as mergeDecision from "./channel/mergeDecision.js" import * as mergeState from "./channel/mergeState.js" import * as mergeStrategy_ from "./channel/mergeStrategy.js" import * as singleProducerAsyncInput from "./channel/singleProducerAsyncInput.js" import * as coreEffect from "./core-effect.js" import * as core from "./core-stream.js" import * as MergeDecisionOpCodes from "./opCodes/channelMergeDecision.js" import * as MergeStateOpCodes from "./opCodes/channelMergeState.js" import * as ChannelStateOpCodes from "./opCodes/channelState.js" import * as tracer from "./tracer.js" /** @internal */ export const acquireUseRelease = ( acquire: Effect.Effect, use: (a: Acquired) => Channel.Channel, release: (a: Acquired, exit: Exit.Exit) => Effect.Effect ): Channel.Channel => core.flatMap( core.fromEffect( Ref.make< (exit: Exit.Exit) => Effect.Effect >(() => Effect.void) ), (ref) => pipe( core.fromEffect( Effect.uninterruptible( Effect.tap( acquire, (a) => Ref.set(ref, (exit) => release(a, exit)) ) ) ), core.flatMap(use), core.ensuringWith((exit) => Effect.flatMap(Ref.get(ref), (f) => f(exit))) ) ) /** @internal */ export const as = dual< ( value: OutDone2 ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, value: OutDone2 ) => Channel.Channel >(2, ( self: Channel.Channel, value: OutDone2 ): Channel.Channel => map(self, () => value)) /** @internal */ export const asVoid = ( self: Channel.Channel ): Channel.Channel => map(self, constVoid) /** @internal */ export const buffer = ( options: { readonly empty: InElem readonly isEmpty: Predicate readonly ref: Ref.Ref } ): Channel.Channel => core.suspend(() => { const doBuffer = ( empty: InElem, isEmpty: Predicate, ref: Ref.Ref ): Channel.Channel => unwrap( Ref.modify(ref, (inElem) => isEmpty(inElem) ? [ core.readWith({ onInput: (input: InElem) => core.flatMap( core.write(input), () => doBuffer(empty, isEmpty, ref) ), onFailure: (error: InErr) => core.fail(error), onDone: (done: InDone) => core.succeedNow(done) }), inElem ] as const : [ core.flatMap( core.write(inElem), () => doBuffer(empty, isEmpty, ref) ), empty ] as const) ) return doBuffer(options.empty, options.isEmpty, options.ref) }) /** @internal */ export const bufferChunk = ( ref: Ref.Ref> ): Channel.Channel, Chunk.Chunk, InErr, InErr, InDone, InDone> => buffer({ empty: Chunk.empty(), isEmpty: Chunk.isEmpty, ref }) /** @internal */ export const catchAll = dual< ( f: (error: OutErr) => Channel.Channel ) => ( self: Channel.Channel ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr1, InErr & InErr1, OutDone1 | OutDone, InDone & InDone1, Env1 | Env >, ( self: Channel.Channel, f: (error: OutErr) => Channel.Channel ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr1, InErr & InErr1, OutDone1 | OutDone, InDone & InDone1, Env1 | Env > >( 2, ( self: Channel.Channel, f: (error: OutErr) => Channel.Channel ): Channel.Channel< OutElem | OutElem1, InElem & InElem1, OutErr1, InErr & InErr1, OutDone | OutDone1, InDone & InDone1, Env | Env1 > => core.catchAllCause(self, (cause) => Either.match(Cause.failureOrCause(cause), { onLeft: f, onRight: core.failCause })) ) /** @internal */ export const concatMap = dual< ( f: (o: OutElem) => Channel.Channel ) => ( self: Channel.Channel ) => Channel.Channel< OutElem2, InElem & InElem2, OutErr2 | OutErr, InErr & InErr2, unknown, InDone & InDone2, Env2 | Env >, ( self: Channel.Channel, f: (o: OutElem) => Channel.Channel ) => Channel.Channel< OutElem2, InElem & InElem2, OutErr2 | OutErr, InErr & InErr2, unknown, InDone & InDone2, Env2 | Env > >(2, ( self: Channel.Channel, f: (o: OutElem) => Channel.Channel ): Channel.Channel< OutElem2, InElem & InElem2, OutErr | OutErr2, InErr & InErr2, unknown, InDone & InDone2, Env | Env2 > => core.concatMapWith(self, f, () => void 0, () => void 0)) /** @internal */ export const collect = dual< ( pf: (o: OutElem) => Option.Option ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, pf: (o: OutElem) => Option.Option ) => Channel.Channel >(2, ( self: Channel.Channel, pf: (o: OutElem) => Option.Option ): Channel.Channel => { const collector: Channel.Channel = core .readWith({ onInput: (out) => Option.match(pf(out), { onNone: () => collector, onSome: (out2) => core.flatMap(core.write(out2), () => collector) }), onFailure: core.fail, onDone: core.succeedNow }) return core.pipeTo(self, collector) }) /** @internal */ export const concatOut = ( self: Channel.Channel< Channel.Channel, InElem, OutErr, InErr, OutDone, InDone, Env > ): Channel.Channel => core.concatAll(self) /** @internal */ export const mapInput = dual< ( f: (a: InDone0) => InDone ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (a: InDone0) => InDone ) => Channel.Channel >(2, ( self: Channel.Channel, f: (a: InDone0) => InDone ): Channel.Channel => { const reader: Channel.Channel = core.readWith({ onInput: (inElem: InElem) => core.flatMap(core.write(inElem), () => reader), onFailure: core.fail, onDone: (done: InDone0) => core.succeedNow(f(done)) }) return core.pipeTo(reader, self) }) /** @internal */ export const mapInputEffect = dual< ( f: (i: InDone0) => Effect.Effect ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (i: InDone0) => Effect.Effect ) => Channel.Channel >(2, ( self: Channel.Channel, f: (i: InDone0) => Effect.Effect ): Channel.Channel => { const reader: Channel.Channel = core.readWith({ onInput: (inElem) => core.flatMap(core.write(inElem), () => reader), onFailure: core.fail, onDone: (done) => core.fromEffect(f(done)) }) return core.pipeTo(reader, self) }) /** @internal */ export const mapInputError = dual< ( f: (a: InErr0) => InErr ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (a: InErr0) => InErr ) => Channel.Channel >(2, ( self: Channel.Channel, f: (a: InErr0) => InErr ): Channel.Channel => { const reader: Channel.Channel = core.readWith({ onInput: (inElem: InElem) => core.flatMap(core.write(inElem), () => reader), onFailure: (error) => core.fail(f(error)), onDone: core.succeedNow }) return core.pipeTo(reader, self) }) /** @internal */ export const mapInputErrorEffect = dual< ( f: (error: InErr0) => Effect.Effect ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (error: InErr0) => Effect.Effect ) => Channel.Channel >(2, ( self: Channel.Channel, f: (error: InErr0) => Effect.Effect ): Channel.Channel => { const reader: Channel.Channel = core.readWith({ onInput: (inElem) => core.flatMap(core.write(inElem), () => reader), onFailure: (error) => core.fromEffect(f(error)), onDone: core.succeedNow }) return core.pipeTo(reader, self) }) /** @internal */ export const mapInputIn = dual< ( f: (a: InElem0) => InElem ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (a: InElem0) => InElem ) => Channel.Channel >(2, ( self: Channel.Channel, f: (a: InElem0) => InElem ): Channel.Channel => { const reader: Channel.Channel = core.readWith({ onInput: (inElem) => core.flatMap(core.write(f(inElem)), () => reader), onFailure: core.fail, onDone: core.succeedNow }) return core.pipeTo(reader, self) }) /** @internal */ export const mapInputInEffect = dual< ( f: (a: InElem0) => Effect.Effect ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (a: InElem0) => Effect.Effect ) => Channel.Channel >(2, ( self: Channel.Channel, f: (a: InElem0) => Effect.Effect ): Channel.Channel => { const reader: Channel.Channel = core.readWith({ onInput: (inElem) => core.flatMap(core.flatMap(core.fromEffect(f(inElem)), core.write), () => reader), onFailure: core.fail, onDone: core.succeedNow }) return core.pipeTo(reader, self) }) /** @internal */ export const doneCollect = ( self: Channel.Channel ): Channel.Channel, OutDone], InDone, Env> => core.suspend(() => { const builder: Array = [] return pipe( core.pipeTo(self, doneCollectReader(builder)), core.flatMap((outDone) => core.succeed([Chunk.unsafeFromArray(builder), outDone])) ) }) /** @internal */ const doneCollectReader = ( builder: Array ): Channel.Channel => { return core.readWith({ onInput: (outElem) => core.flatMap( core.sync(() => { builder.push(outElem) }), () => doneCollectReader(builder) ), onFailure: core.fail, onDone: core.succeed }) } /** @internal */ export const drain = ( self: Channel.Channel ): Channel.Channel => { const drainer: Channel.Channel = core .readWithCause({ onInput: () => drainer, onFailure: core.failCause, onDone: core.succeed }) return core.pipeTo(self, drainer) } /** @internal */ export const emitCollect = ( self: Channel.Channel ): Channel.Channel<[Chunk.Chunk, OutDone], InElem, OutErr, InErr, void, InDone, Env> => core.flatMap(doneCollect(self), core.write) /** @internal */ export const ensuring = dual< ( finalizer: Effect.Effect ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, finalizer: Effect.Effect ) => Channel.Channel >(2, ( self: Channel.Channel, finalizer: Effect.Effect ): Channel.Channel => core.ensuringWith(self, () => finalizer)) /** @internal */ export const context = (): Channel.Channel, unknown, Env> => core.fromEffect(Effect.context()) /** @internal */ export const contextWith = ( f: (env: Context.Context) => OutDone ): Channel.Channel => map(context(), f) /** @internal */ export const contextWithChannel = < Env, OutElem, InElem, OutErr, InErr, OutDone, InDone, Env1 >( f: (env: Context.Context) => Channel.Channel ): Channel.Channel => core.flatMap(context(), f) /** @internal */ export const contextWithEffect = ( f: (env: Context.Context) => Effect.Effect ): Channel.Channel => mapEffect(context(), f) /** @internal */ export const flatten = < OutElem, InElem, OutErr, InErr, OutElem1, InElem1, OutErr1, InErr1, OutDone2, InDone1, Env1, InDone, Env >( self: Channel.Channel< OutElem, InElem, OutErr, InErr, Channel.Channel, InDone, Env > ): Channel.Channel< OutElem | OutElem1, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, OutDone2, InDone & InDone1, Env | Env1 > => core.flatMap(self, identity) /** @internal */ export const foldChannel = dual< < OutErr, OutElem1, InElem1, OutErr1, InErr1, OutDone1, InDone1, Env1, OutDone, OutElem2, InElem2, OutErr2, InErr2, OutDone2, InDone2, Env2 >( options: { readonly onFailure: ( error: OutErr ) => Channel.Channel readonly onSuccess: ( done: OutDone ) => Channel.Channel } ) => ( self: Channel.Channel ) => Channel.Channel< OutElem1 | OutElem2 | OutElem, InElem & InElem1 & InElem2, OutErr1 | OutErr2, InErr & InErr1 & InErr2, OutDone1 | OutDone2, InDone & InDone1 & InDone2, Env1 | Env2 | Env >, < OutElem, InElem, OutErr, InErr, OutDone, InDone, Env, OutElem1, InElem1, OutErr1, InErr1, OutDone1, InDone1, Env1, OutElem2, InElem2, OutErr2, InErr2, OutDone2, InDone2, Env2 >( self: Channel.Channel, options: { readonly onFailure: ( error: OutErr ) => Channel.Channel readonly onSuccess: ( done: OutDone ) => Channel.Channel } ) => Channel.Channel< OutElem1 | OutElem2 | OutElem, InElem & InElem1 & InElem2, OutErr1 | OutErr2, InErr & InErr1 & InErr2, OutDone1 | OutDone2, InDone & InDone1 & InDone2, Env1 | Env2 | Env > >(2, < OutElem, InElem, OutErr, InErr, OutDone, InDone, Env, OutElem1, InElem1, OutErr1, InErr1, OutDone1, InDone1, Env1, OutElem2, InElem2, OutErr2, InErr2, OutDone2, InDone2, Env2 >( self: Channel.Channel, options: { readonly onFailure: (error: OutErr) => Channel.Channel readonly onSuccess: (done: OutDone) => Channel.Channel } ): Channel.Channel< OutElem | OutElem2 | OutElem1, InElem & InElem1 & InElem2, OutErr2 | OutErr1, InErr & InErr1 & InErr2, OutDone2 | OutDone1, InDone & InDone1 & InDone2, Env | Env1 | Env2 > => core.foldCauseChannel(self, { onFailure: (cause) => { const either = Cause.failureOrCause(cause) switch (either._tag) { case "Left": { return options.onFailure(either.left) } case "Right": { return core.failCause(either.right) } } }, onSuccess: options.onSuccess })) /** @internal */ export const fromEither = ( either: Either.Either ): Channel.Channel => core.suspend(() => Either.match(either, { onLeft: core.fail, onRight: core.succeed })) /** @internal */ export const fromInput = ( input: SingleProducerAsyncInput.AsyncInputConsumer ): Channel.Channel => unwrap( input.takeWith( core.failCause, (elem) => core.flatMap(core.write(elem), () => fromInput(input)), core.succeed ) ) /** @internal */ export const fromPubSub = ( pubsub: PubSub.PubSub>> ): Channel.Channel => unwrapScoped(Effect.map(PubSub.subscribe(pubsub), fromQueue)) /** @internal */ export const fromPubSubScoped = ( pubsub: PubSub.PubSub>> ): Effect.Effect, never, Scope.Scope> => Effect.map(PubSub.subscribe(pubsub), fromQueue) /** @internal */ export const fromOption = ( option: Option.Option ): Channel.Channel, unknown, A, unknown> => core.suspend(() => Option.match(option, { onNone: () => core.fail(Option.none()), onSome: core.succeed }) ) /** @internal */ export const fromQueue = ( queue: Queue.Dequeue>> ): Channel.Channel => core.suspend(() => fromQueueInternal(queue)) /** @internal */ const fromQueueInternal = ( queue: Queue.Dequeue>> ): Channel.Channel => pipe( core.fromEffect(Queue.take(queue)), core.flatMap(Either.match({ onLeft: Exit.match({ onFailure: core.failCause, onSuccess: core.succeedNow }), onRight: (elem) => core.flatMap( core.write(elem), () => fromQueueInternal(queue) ) })) ) /** @internal */ export const identityChannel = (): Channel.Channel => core.readWith({ onInput: (input: Elem) => core.flatMap(core.write(input), () => identityChannel()), onFailure: core.fail, onDone: core.succeedNow }) /** @internal */ export const interruptWhen = dual< ( effect: Effect.Effect ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, effect: Effect.Effect ) => Channel.Channel >(2, ( self: Channel.Channel, effect: Effect.Effect ): Channel.Channel => mergeWith(self, { other: core.fromEffect(effect), onSelfDone: (selfDone) => mergeDecision.Done(Effect.suspend(() => selfDone)), onOtherDone: (effectDone) => mergeDecision.Done(Effect.suspend(() => effectDone)) })) /** @internal */ export const interruptWhenDeferred = dual< ( deferred: Deferred.Deferred ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, deferred: Deferred.Deferred ) => Channel.Channel >(2, ( self: Channel.Channel, deferred: Deferred.Deferred ): Channel.Channel => interruptWhen(self, Deferred.await(deferred))) /** @internal */ export const map = dual< ( f: (out: OutDone) => OutDone2 ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (out: OutDone) => OutDone2 ) => Channel.Channel >(2, ( self: Channel.Channel, f: (out: OutDone) => OutDone2 ): Channel.Channel => core.flatMap(self, (a) => core.sync(() => f(a)))) /** @internal */ export const mapEffect = dual< ( f: (o: OutDone) => Effect.Effect ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (o: OutDone) => Effect.Effect ) => Channel.Channel >(2, ( self: Channel.Channel, f: (o: OutDone) => Effect.Effect ): Channel.Channel => core.flatMap(self, (z) => core.fromEffect(f(z)))) /** @internal */ export const mapError = dual< ( f: (err: OutErr) => OutErr2 ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (err: OutErr) => OutErr2 ) => Channel.Channel >(2, ( self: Channel.Channel, f: (err: OutErr) => OutErr2 ): Channel.Channel => mapErrorCause(self, Cause.map(f))) /** @internal */ export const mapErrorCause = dual< ( f: (cause: Cause.Cause) => Cause.Cause ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (cause: Cause.Cause) => Cause.Cause ) => Channel.Channel >(2, ( self: Channel.Channel, f: (cause: Cause.Cause) => Cause.Cause ): Channel.Channel => core.catchAllCause(self, (cause) => core.failCause(f(cause)))) /** @internal */ export const mapOut = dual< ( f: (o: OutElem) => OutElem2 ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (o: OutElem) => OutElem2 ) => Channel.Channel >(2, ( self: Channel.Channel, f: (o: OutElem) => OutElem2 ): Channel.Channel => { const reader: Channel.Channel = core .readWith({ onInput: (outElem) => core.flatMap(core.write(f(outElem)), () => reader), onFailure: core.fail, onDone: core.succeedNow }) return core.pipeTo(self, reader) }) /** @internal */ export const mapOutEffect = dual< ( f: (o: OutElem) => Effect.Effect ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (o: OutElem) => Effect.Effect ) => Channel.Channel >(2, ( self: Channel.Channel, f: (o: OutElem) => Effect.Effect ): Channel.Channel => { const reader: Channel.Channel = core .readWithCause({ onInput: (outElem) => pipe( core.fromEffect(f(outElem)), core.flatMap(core.write), core.flatMap(() => reader) ), onFailure: core.failCause, onDone: core.succeedNow }) return core.pipeTo(self, reader) }) /** @internal */ export const mapOutEffectPar = dual< ( f: (o: OutElem) => Effect.Effect, n: number ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (o: OutElem) => Effect.Effect, n: number ) => Channel.Channel >(3, ( self: Channel.Channel, f: (o: OutElem) => Effect.Effect, n: number ): Channel.Channel => unwrapScopedWith( (scope) => Effect.gen(function*() { const input = yield* singleProducerAsyncInput.make() const queueReader = fromInput(input) const queue = yield* Queue.bounded, OutErr | OutErr1, Env1>>(n) yield* Scope.addFinalizer(scope, Queue.shutdown(queue)) const errorSignal = yield* Deferred.make() const withPermits = n === Number.POSITIVE_INFINITY ? ((_: number) => identity) : (yield* Effect.makeSemaphore(n)).withPermits const pull = yield* queueReader.pipe(core.pipeTo(self), toPullIn(scope)) yield* pull.pipe( Effect.matchCauseEffect({ onFailure: (cause) => Queue.offer(queue, Effect.failCause(cause)), onSuccess: Either.match({ onLeft: (outDone) => Effect.zipRight( Effect.interruptible(withPermits(n)(Effect.void)), Effect.asVoid(Queue.offer(queue, Effect.succeed(Either.left(outDone)))) ), onRight: (outElem) => Effect.gen(function*() { const deferred = yield* Deferred.make() const latch = yield* Deferred.make() yield* Queue.offer(queue, Effect.map(Deferred.await(deferred), Either.right)) yield* Deferred.succeed(latch, void 0).pipe( Effect.zipRight( Effect.uninterruptibleMask((restore) => Effect.exit(restore(Deferred.await(errorSignal))).pipe( Effect.raceFirst(Effect.exit(restore(f(outElem)))), Effect.flatMap(identity) ) ).pipe( Effect.tapErrorCause((cause) => Deferred.failCause(errorSignal, cause)), Effect.intoDeferred(deferred) ) ), withPermits(1), Effect.forkIn(scope) ) yield* Deferred.await(latch) }) }) }), Effect.forever, Effect.interruptible, Effect.forkIn(scope) ) const consumer: Channel.Channel = unwrap( Effect.matchCause(Effect.flatten(Queue.take(queue)), { onFailure: core.failCause, onSuccess: Either.match({ onLeft: core.succeedNow, onRight: (outElem) => core.flatMap(core.write(outElem), () => consumer) }) }) ) return core.embedInput(consumer, input) }) )) /** @internal */ export const mergeAll = ( options: { readonly concurrency: number | "unbounded" readonly bufferSize?: number | undefined readonly mergeStrategy?: MergeStrategy.MergeStrategy | undefined } ) => { return < OutElem, InElem1, OutErr1, InErr1, InDone1, Env1, InElem, OutErr, InErr, InDone, Env >( channels: Channel.Channel< Channel.Channel, InElem, OutErr, InErr, unknown, InDone, Env > ): Channel.Channel< OutElem, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, unknown, InDone & InDone1, Env | Env1 > => mergeAllWith(options)(channels, constVoid) } /** @internal */ export const mergeAllUnbounded = < OutElem, InElem1, OutErr1, InErr1, InDone1, Env1, InElem, OutErr, InErr, InDone, Env >( channels: Channel.Channel< Channel.Channel, InElem, OutErr, InErr, unknown, InDone, Env > ): Channel.Channel< OutElem, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, unknown, InDone & InDone1, Env | Env1 > => mergeAllWith({ concurrency: "unbounded" })(channels, constVoid) /** @internal */ export const mergeAllUnboundedWith = < OutElem, InElem1, OutErr1, InErr1, OutDone, InDone1, Env1, InElem, OutErr, InErr, InDone, Env >( channels: Channel.Channel< Channel.Channel, InElem, OutErr, InErr, OutDone, InDone, Env >, f: (o1: OutDone, o2: OutDone) => OutDone ): Channel.Channel< OutElem, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, OutDone, InDone & InDone1, Env | Env1 > => mergeAllWith({ concurrency: "unbounded" })(channels, f) /** @internal */ export const mergeAllWith = ( { bufferSize = 16, concurrency, mergeStrategy = mergeStrategy_.BackPressure() }: { readonly concurrency: number | "unbounded" readonly bufferSize?: number | undefined readonly mergeStrategy?: MergeStrategy.MergeStrategy | undefined } ) => ( channels: Channel.Channel< Channel.Channel, InElem, OutErr, InErr, OutDone, InDone, Env >, f: (o1: OutDone, o2: OutDone) => OutDone ): Channel.Channel< OutElem, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, OutDone, InDone & InDone1, Env | Env1 > => unwrapScopedWith( (scope) => Effect.gen(function*() { const concurrencyN = concurrency === "unbounded" ? Number.MAX_SAFE_INTEGER : concurrency const input = yield* singleProducerAsyncInput.make< InErr & InErr1, InElem & InElem1, InDone & InDone1 >() const queueReader = fromInput(input) const queue = yield* Queue.bounded, OutErr | OutErr1, Env>>( bufferSize ) yield* Scope.addFinalizer(scope, Queue.shutdown(queue)) const cancelers = yield* Queue.unbounded>() yield* Scope.addFinalizer(scope, Queue.shutdown(cancelers)) const lastDone = yield* Ref.make>(Option.none()) const errorSignal = yield* Deferred.make() const withPermits = (yield* Effect.makeSemaphore(concurrencyN)).withPermits const pull = yield* toPullIn(core.pipeTo(queueReader, channels), scope) function evaluatePull( pull: Effect.Effect< Either.Either, OutErr | OutErr1, Env | Env1 > ) { return pull.pipe( Effect.flatMap(Either.match({ onLeft: (done) => Effect.succeed(Option.some(done)), onRight: (outElem) => Effect.as( Queue.offer(queue, Effect.succeed(Either.right(outElem))), Option.none() ) })), Effect.repeat({ until: (_): _ is Option.Some => Option.isSome(_) }), Effect.flatMap((outDone) => Ref.update( lastDone, Option.match({ onNone: () => Option.some(outDone.value), onSome: (lastDone) => Option.some(f(lastDone, outDone.value)) }) ) ), Effect.catchAllCause((cause) => Cause.isInterrupted(cause) ? Effect.failCause(cause) : Queue.offer(queue, Effect.failCause(cause)).pipe( Effect.zipRight(Deferred.succeed(errorSignal, void 0)), Effect.asVoid ) ) ) } yield* pull.pipe( Effect.matchCauseEffect({ onFailure: (cause) => Queue.offer(queue, Effect.failCause(cause)).pipe( Effect.zipRight(Effect.succeed(false)) ), onSuccess: Either.match({ onLeft: (outDone) => Effect.raceWith( Effect.interruptible(Deferred.await(errorSignal)), Effect.interruptible(withPermits(concurrencyN)(Effect.void)), { onSelfDone: (_, permitAcquisition) => Effect.as(Fiber.interrupt(permitAcquisition), false), onOtherDone: (_, failureAwait) => Effect.zipRight( Fiber.interrupt(failureAwait), Ref.get(lastDone).pipe( Effect.flatMap(Option.match({ onNone: () => Queue.offer(queue, Effect.succeed(Either.left(outDone))), onSome: (lastDone) => Queue.offer(queue, Effect.succeed(Either.left(f(lastDone, outDone)))) })), Effect.as(false) ) ) } ), onRight: (channel) => mergeStrategy_.match(mergeStrategy, { onBackPressure: () => Effect.gen(function*() { const latch = yield* Deferred.make() const raceEffects = Effect.scopedWith((scope) => toPullIn(core.pipeTo(queueReader, channel), scope).pipe( Effect.flatMap((pull) => Effect.race( Effect.exit(evaluatePull(pull)), Effect.exit(Effect.interruptible(Deferred.await(errorSignal))) ) ), Effect.flatMap(identity) ) ) yield* Deferred.succeed(latch, void 0).pipe( Effect.zipRight(raceEffects), withPermits(1), Effect.forkIn(scope) ) yield* Deferred.await(latch) const errored = yield* Deferred.isDone(errorSignal) return !errored }), onBufferSliding: () => Effect.gen(function*() { const canceler = yield* Deferred.make() const latch = yield* Deferred.make() const size = yield* Queue.size(cancelers) yield* Queue.take(cancelers).pipe( Effect.flatMap((canceler) => Deferred.succeed(canceler, void 0)), Effect.when(() => size >= concurrencyN) ) yield* Queue.offer(cancelers, canceler) const raceEffects = Effect.scopedWith((scope) => toPullIn(core.pipeTo(queueReader, channel), scope).pipe( Effect.flatMap((pull) => Effect.exit(evaluatePull(pull)).pipe( Effect.race(Effect.exit(Effect.interruptible(Deferred.await(errorSignal)))), Effect.race(Effect.exit(Effect.interruptible(Deferred.await(canceler)))) ) ), Effect.flatMap(identity) ) ) yield* Deferred.succeed(latch, void 0).pipe( Effect.zipRight(raceEffects), withPermits(1), Effect.forkIn(scope) ) yield* Deferred.await(latch) const errored = yield* Deferred.isDone(errorSignal) return !errored }) }) }) }), Effect.repeat({ while: (_) => _ }), Effect.forkIn(scope) ) const consumer: Channel.Channel = pipe( Queue.take(queue), Effect.flatten, Effect.matchCause({ onFailure: core.failCause, onSuccess: Either.match({ onLeft: core.succeedNow, onRight: (outElem) => core.flatMap(core.write(outElem), () => consumer) }) }), unwrap ) return core.embedInput(consumer, input) }) ) /** @internal */ export const mergeMap = dual< ( f: (outElem: OutElem) => Channel.Channel, options: { readonly concurrency: number | "unbounded" readonly bufferSize?: number | undefined readonly mergeStrategy?: MergeStrategy.MergeStrategy | undefined } ) => ( self: Channel.Channel ) => Channel.Channel< OutElem1, InElem & InElem1, OutErr1 | OutErr, InErr & InErr1, unknown, InDone & InDone1, Env1 | Env >, ( self: Channel.Channel, f: (outElem: OutElem) => Channel.Channel, options: { readonly concurrency: number | "unbounded" readonly bufferSize?: number | undefined readonly mergeStrategy?: MergeStrategy.MergeStrategy | undefined } ) => Channel.Channel< OutElem1, InElem & InElem1, OutErr1 | OutErr, InErr & InErr1, unknown, InDone & InDone1, Env1 | Env > >(3, ( self: Channel.Channel, f: (outElem: OutElem) => Channel.Channel, options: { readonly concurrency: number | "unbounded" readonly bufferSize?: number | undefined readonly mergeStrategy?: MergeStrategy.MergeStrategy | undefined } ): Channel.Channel< OutElem1, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, unknown, InDone & InDone1, Env | Env1 > => mergeAll(options)(mapOut(self, f))) /** @internal */ export const mergeOut = dual< ( n: number ) => ( self: Channel.Channel< Channel.Channel, InElem, OutErr, InErr, OutDone, InDone, Env > ) => Channel.Channel< OutElem1, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, unknown, InDone & InDone1, Env | Env1 >, ( self: Channel.Channel< Channel.Channel, InElem, OutErr, InErr, OutDone, InDone, Env >, n: number ) => Channel.Channel< OutElem1, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, unknown, InDone & InDone1, Env | Env1 > >(2, ( self: Channel.Channel< Channel.Channel, InElem, OutErr, InErr, OutDone, InDone, Env >, n: number ): Channel.Channel< OutElem1, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, unknown, InDone & InDone1, Env | Env1 > => mergeAll({ concurrency: n })(mapOut(self, identity))) /** @internal */ export const mergeOutWith = dual< ( n: number, f: (o1: OutDone1, o2: OutDone1) => OutDone1 ) => ( self: Channel.Channel< Channel.Channel, InElem, OutErr, InErr, OutDone1, InDone, Env > ) => Channel.Channel< OutElem1, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, OutDone1, InDone & InDone1, Env | Env1 >, ( self: Channel.Channel< Channel.Channel, InElem, OutErr, InErr, OutDone1, InDone, Env >, n: number, f: (o1: OutDone1, o2: OutDone1) => OutDone1 ) => Channel.Channel< OutElem1, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, OutDone1, InDone & InDone1, Env | Env1 > >(3, ( self: Channel.Channel< Channel.Channel, InElem, OutErr, InErr, OutDone1, InDone, Env >, n: number, f: (o1: OutDone1, o2: OutDone1) => OutDone1 ): Channel.Channel< OutElem1, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, OutDone1, InDone & InDone1, Env | Env1 > => mergeAllWith({ concurrency: n })(mapOut(self, identity), f)) /** @internal */ export const mergeWith = dual< ( options: { readonly other: Channel.Channel readonly onSelfDone: ( exit: Exit.Exit ) => MergeDecision.MergeDecision readonly onOtherDone: ( ex: Exit.Exit ) => MergeDecision.MergeDecision } ) => ( self: Channel.Channel ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr2 | OutErr3, InErr & InErr1, OutDone2 | OutDone3, InDone & InDone1, Env1 | Env >, < OutElem, InElem, OutErr, InErr, OutDone, InDone, Env, OutElem1, InElem1, OutErr1, InErr1, OutDone1, InDone1, Env1, OutErr2, OutDone2, OutErr3, OutDone3 >( self: Channel.Channel, options: { readonly other: Channel.Channel readonly onSelfDone: ( exit: Exit.Exit ) => MergeDecision.MergeDecision readonly onOtherDone: ( ex: Exit.Exit ) => MergeDecision.MergeDecision } ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr2 | OutErr3, InErr & InErr1, OutDone2 | OutDone3, InDone & InDone1, Env1 | Env > >(2, < OutElem, InElem, OutErr, InErr, OutDone, InDone, Env, OutElem1, InElem1, OutErr1, InErr1, OutDone1, InDone1, Env1, OutErr2, OutDone2, OutErr3, OutDone3 >( self: Channel.Channel, options: { readonly other: Channel.Channel readonly onSelfDone: ( exit: Exit.Exit ) => MergeDecision.MergeDecision readonly onOtherDone: ( ex: Exit.Exit ) => MergeDecision.MergeDecision } ): Channel.Channel< OutElem | OutElem1, InElem & InElem1, OutErr2 | OutErr3, InErr & InErr1, OutDone2 | OutDone3, InDone & InDone1, Env1 | Env > => { function merge(scope: Scope.Scope) { return Effect.gen(function*() { type State = MergeState.MergeState< Env | Env1, OutErr, OutErr1, OutErr2 | OutErr3, OutElem | OutElem1, OutDone, OutDone1, OutDone2 | OutDone3 > const input = yield* singleProducerAsyncInput.make< InErr & InErr1, InElem & InElem1, InDone & InDone1 >() const queueReader = fromInput(input) const pullL = yield* toPullIn(core.pipeTo(queueReader, self), scope) const pullR = yield* toPullIn(core.pipeTo(queueReader, options.other), scope) function handleSide( exit: Exit.Exit, Err>, fiber: Fiber.Fiber, Err2>, pull: Effect.Effect, Err, Env | Env1> ) { return ( done: ( ex: Exit.Exit ) => MergeDecision.MergeDecision< Env | Env1, Err2, Done2, OutErr2 | OutErr3, OutDone2 | OutDone3 >, both: ( f1: Fiber.Fiber, Err>, f2: Fiber.Fiber, Err2> ) => State, single: ( f: ( ex: Exit.Exit ) => Effect.Effect ) => State ): Effect.Effect< Channel.Channel< OutElem | OutElem1, unknown, OutErr2 | OutErr3, unknown, OutDone2 | OutDone3, unknown, Env | Env1 >, never, Env | Env1 > => { function onDecision( decision: MergeDecision.MergeDecision< Env | Env1, Err2, Done2, OutErr2 | OutErr3, OutDone2 | OutDone3 > ): Effect.Effect< Channel.Channel< OutElem | OutElem1, unknown, OutErr2 | OutErr3, unknown, OutDone2 | OutDone3, unknown, Env | Env1 > > { const op = decision as mergeDecision.Primitive if (op._tag === MergeDecisionOpCodes.OP_DONE) { return Effect.succeed( core.fromEffect( Effect.zipRight( Fiber.interrupt(fiber), op.effect ) ) ) } return Effect.map( Fiber.await(fiber), Exit.match({ onFailure: (cause) => core.fromEffect(op.f(Exit.failCause(cause))), onSuccess: Either.match({ onLeft: (done) => core.fromEffect(op.f(Exit.succeed(done))), onRight: (elem) => zipRight(core.write(elem), go(single(op.f))) }) }) ) } return Exit.match(exit, { onFailure: (cause) => onDecision(done(Exit.failCause(cause))), onSuccess: Either.match({ onLeft: (z) => onDecision(done(Exit.succeed(z))), onRight: (elem) => Effect.succeed( core.flatMap(core.write(elem), () => core.flatMap( core.fromEffect(Effect.forkIn(Effect.interruptible(pull), scope)), (leftFiber) => go(both(leftFiber, fiber)) )) ) }) }) } } function go( state: State ): Channel.Channel< OutElem | OutElem1, unknown, OutErr2 | OutErr3, unknown, OutDone2 | OutDone3, unknown, Env | Env1 > { switch (state._tag) { case MergeStateOpCodes.OP_BOTH_RUNNING: { const leftJoin = Effect.interruptible(Fiber.join(state.left)) const rightJoin = Effect.interruptible(Fiber.join(state.right)) return unwrap( Effect.raceWith(leftJoin, rightJoin, { onSelfDone: (leftExit, rf) => Effect.zipRight( Fiber.interrupt(rf), handleSide(leftExit, state.right, pullL)( options.onSelfDone, mergeState.BothRunning, (f) => mergeState.LeftDone(f) ) ), onOtherDone: (rightExit, lf) => Effect.zipRight( Fiber.interrupt(lf), handleSide(rightExit, state.left, pullR)( options.onOtherDone as ( ex: Exit.Exit ) => MergeDecision.MergeDecision< Env1 | Env, OutErr, OutDone, OutErr2 | OutErr3, OutDone2 | OutDone3 >, (left, right) => mergeState.BothRunning(right, left), (f) => mergeState.RightDone(f) ) ) }) ) } case MergeStateOpCodes.OP_LEFT_DONE: { return unwrap( Effect.map( Effect.exit(pullR), Exit.match({ onFailure: (cause) => core.fromEffect(state.f(Exit.failCause(cause))), onSuccess: Either.match({ onLeft: (done) => core.fromEffect(state.f(Exit.succeed(done))), onRight: (elem) => core.flatMap( core.write(elem), () => go(mergeState.LeftDone(state.f)) ) }) }) ) ) } case MergeStateOpCodes.OP_RIGHT_DONE: { return unwrap( Effect.map( Effect.exit(pullL), Exit.match({ onFailure: (cause) => core.fromEffect(state.f(Exit.failCause(cause))), onSuccess: Either.match({ onLeft: (done) => core.fromEffect(state.f(Exit.succeed(done))), onRight: (elem) => core.flatMap( core.write(elem), () => go(mergeState.RightDone(state.f)) ) }) }) ) ) } } } return core.fromEffect( Effect.withFiberRuntime< MergeState.MergeState< Env | Env1, OutErr, OutErr1, OutErr2 | OutErr3, OutElem | OutElem1, OutDone, OutDone1, OutDone2 | OutDone3 >, never, Env | Env1 >((parent) => { const inherit = Effect.withFiberRuntime((state) => { ;(state as any).transferChildren((parent as any).scope()) return Effect.void }) const leftFiber = Effect.interruptible(pullL).pipe( Effect.ensuring(inherit), Effect.forkIn(scope) ) const rightFiber = Effect.interruptible(pullR).pipe( Effect.ensuring(inherit), Effect.forkIn(scope) ) return Effect.zipWith( leftFiber, rightFiber, (left, right): State => mergeState.BothRunning< Env | Env1, OutErr, OutErr1, OutErr2 | OutErr3, OutElem | OutElem1, OutDone, OutDone1, OutDone2 | OutDone3 >(left, right) ) }) ).pipe( core.flatMap(go), core.embedInput(input) ) }) } return unwrapScopedWith(merge) }) /** @internal */ export const never: Channel.Channel = core.fromEffect( Effect.never ) /** @internal */ export const orDie = dual< ( error: LazyArg ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, error: LazyArg ) => Channel.Channel >(2, ( self: Channel.Channel, error: LazyArg ): Channel.Channel => orDieWith(self, error)) /** @internal */ export const orDieWith = dual< ( f: (e: OutErr) => unknown ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (e: OutErr) => unknown ) => Channel.Channel >(2, ( self: Channel.Channel, f: (e: OutErr) => unknown ): Channel.Channel => catchAll(self, (e) => core.failCauseSync(() => Cause.die(f(e)))) as Channel.Channel< OutElem, InElem, never, InErr, OutDone, InDone, Env >) /** @internal */ export const orElse = dual< ( that: LazyArg> ) => ( self: Channel.Channel ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr1, InErr & InErr1, OutDone1 | OutDone, InDone & InDone1, Env1 | Env >, ( self: Channel.Channel, that: LazyArg> ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr1, InErr & InErr1, OutDone1 | OutDone, InDone & InDone1, Env1 | Env > >( 2, ( self: Channel.Channel, that: LazyArg> ): Channel.Channel< OutElem | OutElem1, InElem & InElem1, OutErr1, InErr & InErr1, OutDone | OutDone1, InDone & InDone1, Env | Env1 > => catchAll(self, that) ) /** @internal */ export const pipeToOrFail = dual< ( that: Channel.Channel ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, that: Channel.Channel ) => Channel.Channel >(2, ( self: Channel.Channel, that: Channel.Channel ): Channel.Channel => core.suspend(() => { let channelException: Channel.ChannelException | undefined = undefined const reader: Channel.Channel = core .readWith({ onInput: (outElem) => core.flatMap(core.write(outElem), () => reader), onFailure: (outErr) => { channelException = ChannelException(outErr) return core.failCause(Cause.die(channelException)) }, onDone: core.succeedNow }) const writer: Channel.Channel< OutElem2, OutElem2, OutErr2, OutErr2, OutDone2, OutDone2, Env2 > = core.readWithCause({ onInput: (outElem) => pipe(core.write(outElem), core.flatMap(() => writer)), onFailure: (cause) => Cause.isDieType(cause) && isChannelException(cause.defect) && Equal.equals(cause.defect, channelException) ? core.fail(cause.defect.error as OutErr2) : core.failCause(cause), onDone: core.succeedNow }) return core.pipeTo(core.pipeTo(core.pipeTo(self, reader), that), writer) })) /** @internal */ export const provideService = dual< ( tag: Context.Tag, service: Types.NoInfer ) => ( self: Channel.Channel ) => Channel.Channel>, ( self: Channel.Channel, tag: Context.Tag, service: Types.NoInfer ) => Channel.Channel> >(3, ( self: Channel.Channel, tag: Context.Tag, service: Types.NoInfer ): Channel.Channel> => { return core.flatMap( context(), (context) => core.provideContext(self, Context.add(context, tag, service)) ) }) /** @internal */ export const provideLayer = dual< ( layer: Layer.Layer ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, layer: Layer.Layer ) => Channel.Channel >(2, ( self: Channel.Channel, layer: Layer.Layer ): Channel.Channel => unwrapScopedWith((scope) => Effect.map(Layer.buildWithScope(layer, scope), (context) => core.provideContext(self, context)) )) /** @internal */ export const mapInputContext = dual< ( f: (env: Context.Context) => Context.Context ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, f: (env: Context.Context) => Context.Context ) => Channel.Channel >(2, ( self: Channel.Channel, f: (env: Context.Context) => Context.Context ): Channel.Channel => contextWithChannel((context: Context.Context) => core.provideContext(self, f(context)))) /** @internal */ export const provideSomeLayer = dual< ( layer: Layer.Layer ) => ( self: Channel.Channel ) => Channel.Channel>, ( self: Channel.Channel, layer: Layer.Layer ) => Channel.Channel> >(2, ( self: Channel.Channel, layer: Layer.Layer ): Channel.Channel> => // @ts-expect-error provideLayer(self, Layer.merge(Layer.context>(), layer))) /** @internal */ export const read = (): Channel.Channel, unknown, In, unknown> => core.readOrFail, In>(Option.none()) /** @internal */ export const repeated = ( self: Channel.Channel ): Channel.Channel => core.flatMap(self, () => repeated(self)) /** @internal */ export const run = ( self: Channel.Channel ): Effect.Effect => Effect.scopedWith((scope) => executor.runIn(self, scope)) /** @internal */ export const runCollect = ( self: Channel.Channel ): Effect.Effect<[Chunk.Chunk, OutDone], OutErr, Env> => run(core.collectElements(self)) /** @internal */ export const runDrain = ( self: Channel.Channel ): Effect.Effect => run(drain(self)) /** @internal */ export const runScoped = ( self: Channel.Channel ): Effect.Effect => Effect.scopeWith((scope) => executor.runIn(self, scope)) /** @internal */ export const scoped = ( effect: Effect.Effect ): Channel.Channel> => unwrap( Effect.uninterruptibleMask((restore) => Effect.map(Scope.make(), (scope) => core.acquireReleaseOut( Effect.tapErrorCause( restore(Scope.extend(effect, scope)), (cause) => Scope.close(scope, Exit.failCause(cause)) ), (_, exit) => Scope.close(scope, exit) )) ) ) /** @internal */ export const scopedWith = ( f: (scope: Scope.Scope) => Effect.Effect ): Channel.Channel => unwrapScoped(Effect.map(Effect.scope, (scope) => core.flatMap(core.fromEffect(f(scope)), core.write))) /** @internal */ export const service = ( tag: Context.Tag ): Channel.Channel => core.fromEffect(tag) /** @internal */ export const serviceWith = (tag: Context.Tag) => ( f: (resource: Types.NoInfer) => OutDone ): Channel.Channel => map(service(tag), f) /** @internal */ export const serviceWithChannel = (tag: Context.Tag) => ( f: (resource: Types.NoInfer) => Channel.Channel ): Channel.Channel => core.flatMap(service(tag), f) /** @internal */ export const serviceWithEffect = (tag: Context.Tag) => ( f: (resource: Types.NoInfer) => Effect.Effect ): Channel.Channel => mapEffect(service(tag), f) /** @internal */ export const splitLines = (): Channel.Channel< Chunk.Chunk, Chunk.Chunk, Err, Err, Done, Done, never > => core.suspend(() => { let stringBuilder = "" let midCRLF = false const splitLinesChunk = (chunk: Chunk.Chunk): Chunk.Chunk => { const chunkBuilder: Array = [] Chunk.map(chunk, (str) => { if (str.length !== 0) { let from = 0 let indexOfCR = str.indexOf("\r") let indexOfLF = str.indexOf("\n") if (midCRLF) { if (indexOfLF === 0) { chunkBuilder.push(stringBuilder) stringBuilder = "" from = 1 indexOfLF = str.indexOf("\n", from) } else { stringBuilder = stringBuilder + "\r" } midCRLF = false } while (indexOfCR !== -1 || indexOfLF !== -1) { if (indexOfCR === -1 || (indexOfLF !== -1 && indexOfLF < indexOfCR)) { if (stringBuilder.length === 0) { chunkBuilder.push(str.substring(from, indexOfLF)) } else { chunkBuilder.push(stringBuilder + str.substring(from, indexOfLF)) stringBuilder = "" } from = indexOfLF + 1 indexOfLF = str.indexOf("\n", from) } else { if (str.length === indexOfCR + 1) { midCRLF = true indexOfCR = -1 } else { if (indexOfLF === indexOfCR + 1) { if (stringBuilder.length === 0) { chunkBuilder.push(str.substring(from, indexOfCR)) } else { stringBuilder = stringBuilder + str.substring(from, indexOfCR) chunkBuilder.push(stringBuilder) stringBuilder = "" } from = indexOfCR + 2 indexOfCR = str.indexOf("\r", from) indexOfLF = str.indexOf("\n", from) } else { indexOfCR = str.indexOf("\r", indexOfCR + 1) } } } } if (midCRLF) { stringBuilder = stringBuilder + str.substring(from, str.length - 1) } else { stringBuilder = stringBuilder + str.substring(from, str.length) } } }) return Chunk.unsafeFromArray(chunkBuilder) } const loop: Channel.Channel, Chunk.Chunk, Err, Err, Done, Done, never> = core .readWithCause({ onInput: (input: Chunk.Chunk) => { const out = splitLinesChunk(input) return Chunk.isEmpty(out) ? loop : core.flatMap(core.write(out), () => loop) }, onFailure: (cause) => stringBuilder.length === 0 ? core.failCause(cause) : core.flatMap(core.write(Chunk.of(stringBuilder)), () => core.failCause(cause)), onDone: (done) => stringBuilder.length === 0 ? core.succeed(done) : core.flatMap(core.write(Chunk.of(stringBuilder)), () => core.succeed(done)) }) return loop }) /** @internal */ export const toPubSub = ( pubsub: PubSub.PubSub>> ): Channel.Channel => toQueue(pubsub) /** @internal */ export const toPull = ( self: Channel.Channel ): Effect.Effect, OutErr, Env>, never, Env | Scope.Scope> => Effect.flatMap(Effect.scope, (scope) => toPullIn(self, scope)) /** @internal */ export const toPullIn = dual< (scope: Scope.Scope) => ( self: Channel.Channel ) => Effect.Effect, OutErr, Env>, never, Env>, ( self: Channel.Channel, scope: Scope.Scope ) => Effect.Effect, OutErr, Env>, never, Env> >(2, ( self: Channel.Channel, scope: Scope.Scope ) => Effect.zip( Effect.sync(() => new executor.ChannelExecutor(self, void 0, identity)), Effect.runtime() ).pipe( Effect.tap(([executor, runtime]) => Scope.addFinalizerExit(scope, (exit) => { const finalizer = executor.close(exit) return finalizer !== undefined ? Effect.provide(finalizer, runtime) : Effect.void }) ), Effect.uninterruptible, Effect.map(([executor]) => Effect.suspend(() => interpretToPull( executor.run() as ChannelState.ChannelState, executor ) ) ) )) /** @internal */ const interpretToPull = ( channelState: ChannelState.ChannelState, exec: executor.ChannelExecutor ): Effect.Effect, OutErr, Env> => { const state = channelState as ChannelState.Primitive switch (state._tag) { case ChannelStateOpCodes.OP_DONE: { return Exit.match(exec.getDone(), { onFailure: Effect.failCause, onSuccess: (done): Effect.Effect, OutErr, Env> => Effect.succeed(Either.left(done)) }) } case ChannelStateOpCodes.OP_EMIT: { return Effect.succeed(Either.right(exec.getEmit())) } case ChannelStateOpCodes.OP_FROM_EFFECT: { return pipe( state.effect as Effect.Effect, OutErr, Env>, Effect.flatMap(() => interpretToPull(exec.run() as ChannelState.ChannelState, exec)) ) } case ChannelStateOpCodes.OP_READ: { return executor.readUpstream( state, () => interpretToPull(exec.run() as ChannelState.ChannelState, exec), (cause) => Effect.failCause(cause) as Effect.Effect, OutErr, Env> ) } } } /** @internal */ export const toQueue = ( queue: Queue.Enqueue>> ): Channel.Channel => core.suspend(() => toQueueInternal(queue)) /** @internal */ const toQueueInternal = ( queue: Queue.Enqueue>> ): Channel.Channel => { return core.readWithCause({ onInput: (elem) => core.flatMap( core.fromEffect(Queue.offer(queue, Either.right(elem))), () => toQueueInternal(queue) ), onFailure: (cause) => core.fromEffect(Queue.offer(queue, Either.left(Exit.failCause(cause)))), onDone: (done) => core.fromEffect(Queue.offer(queue, Either.left(Exit.succeed(done)))) }) } /** @internal */ export const unwrap = ( channel: Effect.Effect, E, R> ): Channel.Channel => flatten(core.fromEffect(channel)) /** @internal */ export const unwrapScoped = ( self: Effect.Effect, E, R> ): Channel.Channel> => core.concatAllWith( scoped(self), (d, _) => d, (d, _) => d ) /** @internal */ export const unwrapScopedWith = ( f: (scope: Scope.Scope) => Effect.Effect, E, R> ): Channel.Channel => core.concatAllWith( scopedWith(f), (d, _) => d, (d, _) => d ) /** @internal */ export const updateService = dual< ( tag: Context.Tag, f: (resource: Types.NoInfer) => Types.NoInfer ) => ( self: Channel.Channel ) => Channel.Channel, ( self: Channel.Channel, tag: Context.Tag, f: (resource: Types.NoInfer) => Types.NoInfer ) => Channel.Channel >(3, ( self: Channel.Channel, tag: Context.Tag, f: (resource: Types.NoInfer) => Types.NoInfer ): Channel.Channel => mapInputContext(self, (context: Context.Context) => Context.merge( context, Context.make(tag, f(Context.unsafeGet(context, tag))) ))) /** @internal */ export const withSpan: { ( name: string, options?: Tracer.SpanOptions ): ( self: Channel.Channel ) => Channel.Channel> ( self: Channel.Channel, name: string, options?: Tracer.SpanOptions ): Channel.Channel> } = function() { const dataFirst = typeof arguments[0] !== "string" const name = dataFirst ? arguments[1] : arguments[0] const options = tracer.addSpanStackTrace(dataFirst ? arguments[2] : arguments[1]) const acquire = Effect.all([ Effect.makeSpan(name, options), Effect.context(), Effect.clock, FiberRef.get(FiberRef.currentTracerTimingEnabled) ]) if (dataFirst) { const self = arguments[0] return acquireUseRelease( acquire, ([span, context]) => core.provideContext(self, Context.add(context, tracer.spanTag, span)), ([span, , clock, timingEnabled], exit) => coreEffect.endSpan(span, exit, clock, timingEnabled) ) } return (self: Channel.Channel) => acquireUseRelease( acquire, ([span, context]) => core.provideContext(self, Context.add(context, tracer.spanTag, span)), ([span, , clock, timingEnabled], exit) => coreEffect.endSpan(span, exit, clock, timingEnabled) ) } as any /** @internal */ export const writeAll = ( ...outs: Array ): Channel.Channel => writeChunk(Chunk.fromIterable(outs)) /** @internal */ export const writeChunk = ( outs: Chunk.Chunk ): Channel.Channel => writeChunkWriter(0, outs.length, outs) /** @internal */ const writeChunkWriter = ( idx: number, len: number, chunk: Chunk.Chunk ): Channel.Channel => { return idx === len ? core.void : pipe( core.write(pipe(chunk, Chunk.unsafeGet(idx))), core.flatMap(() => writeChunkWriter(idx + 1, len, chunk)) ) } /** @internal */ export const zip = dual< ( that: Channel.Channel, options?: { readonly concurrent?: boolean | undefined } ) => ( self: Channel.Channel ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr1 | OutErr, InErr & InErr1, readonly [OutDone, OutDone1], InDone & InDone1, Env1 | Env >, ( self: Channel.Channel, that: Channel.Channel, options?: { readonly concurrent?: boolean | undefined } ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr1 | OutErr, InErr & InErr1, readonly [OutDone, OutDone1], InDone & InDone1, Env1 | Env > >( (args) => core.isChannel(args[1]), ( self: Channel.Channel, that: Channel.Channel, options?: { readonly concurrent?: boolean | undefined } ): Channel.Channel< OutElem | OutElem1, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, readonly [OutDone, OutDone1], InDone & InDone1, Env | Env1 > => options?.concurrent ? mergeWith(self, { other: that, onSelfDone: (exit1) => mergeDecision.Await((exit2) => Effect.suspend(() => Exit.zip(exit1, exit2))), onOtherDone: (exit2) => mergeDecision.Await((exit1) => Effect.suspend(() => Exit.zip(exit1, exit2))) }) : core.flatMap(self, (a) => map(that, (b) => [a, b] as const)) ) /** @internal */ export const zipLeft = dual< ( that: Channel.Channel, options?: { readonly concurrent?: boolean | undefined } ) => ( self: Channel.Channel ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr1 | OutErr, InErr & InErr1, OutDone, InDone & InDone1, Env1 | Env >, ( self: Channel.Channel, that: Channel.Channel, options?: { readonly concurrent?: boolean | undefined } ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr1 | OutErr, InErr & InErr1, OutDone, InDone & InDone1, Env1 | Env > >( (args) => core.isChannel(args[1]), ( self: Channel.Channel, that: Channel.Channel, options?: { readonly concurrent?: boolean | undefined } ): Channel.Channel< OutElem | OutElem1, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, OutDone, InDone & InDone1, Env | Env1 > => options?.concurrent ? map(zip(self, that, { concurrent: true }), (tuple) => tuple[0]) : core.flatMap(self, (z) => as(that, z)) ) /** @internal */ export const zipRight = dual< ( that: Channel.Channel, options?: { readonly concurrent?: boolean | undefined } ) => ( self: Channel.Channel ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr1 | OutErr, InErr & InErr1, OutDone1, InDone & InDone1, Env1 | Env >, ( self: Channel.Channel, that: Channel.Channel, options?: { readonly concurrent?: boolean | undefined } ) => Channel.Channel< OutElem1 | OutElem, InElem & InElem1, OutErr1 | OutErr, InErr & InErr1, OutDone1, InDone & InDone1, Env1 | Env > >( (args) => core.isChannel(args[1]), ( self: Channel.Channel, that: Channel.Channel, options?: { readonly concurrent?: boolean | undefined } ): Channel.Channel< OutElem | OutElem1, InElem & InElem1, OutErr | OutErr1, InErr & InErr1, OutDone1, InDone & InDone1, Env | Env1 > => options?.concurrent ? map(zip(self, that, { concurrent: true }), (tuple) => tuple[1]) : core.flatMap(self, () => that) ) /** @internal */ export const ChannelExceptionTypeId: Channel.ChannelExceptionTypeId = Symbol.for( "effect/Channel/ChannelException" ) as Channel.ChannelExceptionTypeId /** @internal */ export const ChannelException = (error: E): Channel.ChannelException => ({ _tag: "ChannelException", [ChannelExceptionTypeId]: ChannelExceptionTypeId, error }) /** @internal */ export const isChannelException = (u: unknown): u is Channel.ChannelException => hasProperty(u, ChannelExceptionTypeId)