001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.fcrepo.kernel.impl.observer;
020
021import static com.google.common.base.Preconditions.checkNotNull;
022
023import java.net.URI;
024import java.util.Collections;
025import java.util.Map;
026import java.util.Set;
027import java.util.concurrent.ConcurrentHashMap;
028import java.util.stream.Collectors;
029
030import javax.inject.Inject;
031
032import org.fcrepo.config.AuthPropsConfig;
033import org.fcrepo.kernel.api.Transaction;
034import org.fcrepo.kernel.api.identifiers.FedoraId;
035import org.fcrepo.kernel.api.models.ResourceFactory;
036import org.fcrepo.kernel.api.observer.EventAccumulator;
037import org.fcrepo.kernel.api.operations.ResourceOperation;
038
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041import org.springframework.stereotype.Component;
042
043import com.google.common.collect.Multimap;
044import com.google.common.collect.MultimapBuilder;
045import com.google.common.eventbus.EventBus;
046
047/**
048 * @author pwinckles
049 */
050@Component
051public class EventAccumulatorImpl implements EventAccumulator {
052
053    private final static Logger LOG = LoggerFactory.getLogger(EventAccumulatorImpl.class);
054
055    private final Map<String, Multimap<FedoraId, EventBuilder>> transactionEventMap;
056
057    @Inject
058    private ResourceFactory resourceFactory;
059
060    @Inject
061    private EventBus eventBus;
062
063    @Inject
064    private AuthPropsConfig authPropsConfig;
065
066    public EventAccumulatorImpl() {
067        this.transactionEventMap = new ConcurrentHashMap<>();
068    }
069
070    @Override
071    public void recordEventForOperation(final Transaction transaction, final FedoraId fedoraId,
072                                        final ResourceOperation operation) {
073        checkNotNull(transaction, "transaction cannot be blank");
074        checkNotNull(fedoraId, "fedoraId cannot be null");
075
076        final String transactionId = transaction.getId();
077        final var events = transactionEventMap.computeIfAbsent(transactionId, key ->
078                MultimapBuilder.hashKeys().arrayListValues().build());
079        final var eventBuilder = ResourceOperationEventBuilder.fromResourceOperation(
080                fedoraId, operation, authPropsConfig.getUserAgentBaseUri());
081        events.put(fedoraId, eventBuilder);
082    }
083
084    @Override
085    public void emitEvents(final Transaction transaction, final String baseUrl, final String userAgent) {
086        LOG.debug("Emitting events for transaction {}", transaction.getId());
087
088        final var eventMap = transactionEventMap.remove(transaction.getId());
089
090        if (eventMap != null) {
091            eventMap.keySet().forEach(fedoraId -> {
092                final var events = eventMap.get(fedoraId);
093
094                try {
095                    final var mergedBuilder = events.stream()
096                            .reduce(EventBuilder::merge).get();
097
098                    final var event = mergedBuilder
099                            .withResourceTypes(loadResourceTypes(transaction, fedoraId))
100                            .withBaseUrl(baseUrl)
101                            .withUserAgent(userAgent)
102                            .build();
103
104                    LOG.debug("Emitting event: {}", event);
105                    eventBus.post(event);
106                } catch (final Exception e) {
107                    LOG.error("Failed to emit events: {}", events, e);
108                }
109            });
110        }
111    }
112
113    @Override
114    public void clearEvents(final Transaction transaction) {
115        LOG.trace("Clearing events for transaction {}", transaction.getId());
116        transactionEventMap.remove(transaction.getId());
117    }
118
119    private Set<String> loadResourceTypes(final Transaction transaction, final FedoraId fedoraId) {
120        try {
121            return resourceFactory.getResource(transaction, fedoraId).getTypes().stream()
122                    .map(URI::toString)
123                    .collect(Collectors.toSet());
124        } catch (final Exception e) {
125            LOG.debug("Could not load resource types for {}", fedoraId, e);
126            // This can happen if the resource no longer exists
127            return Collections.emptySet();
128        }
129    }
130
131}