001/*
002 * Licensed to DuraSpace under one or more contributor license agreements.
003 * See the NOTICE file distributed with this work for additional information
004 * regarding copyright ownership.
005 *
006 * DuraSpace licenses this file to you under the Apache License,
007 * Version 2.0 (the "License"); you may not use this file except in
008 * compliance with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.fcrepo.search.impl;
019
020import static java.time.format.DateTimeFormatter.ISO_INSTANT;
021import static java.util.stream.Collectors.toList;
022import static org.fcrepo.common.db.DbPlatform.POSTGRESQL;
023import static org.fcrepo.search.api.Condition.Field.CONTENT_SIZE;
024import static org.fcrepo.search.api.Condition.Field.FEDORA_ID;
025import static org.fcrepo.search.api.Condition.Field.MIME_TYPE;
026import static org.fcrepo.search.api.Condition.Field.RDF_TYPE;
027
028import java.net.URI;
029import java.sql.ResultSet;
030import java.sql.SQLException;
031import java.sql.Timestamp;
032import java.sql.Types;
033import java.time.Instant;
034import java.time.temporal.ChronoUnit;
035import java.util.ArrayList;
036import java.util.Arrays;
037import java.util.Collections;
038import java.util.HashMap;
039import java.util.HashSet;
040import java.util.List;
041import java.util.Map;
042import java.util.Set;
043import java.util.concurrent.ConcurrentHashMap;
044
045import javax.annotation.PostConstruct;
046import javax.inject.Inject;
047import javax.sql.DataSource;
048
049import com.google.common.collect.Sets;
050import org.fcrepo.common.db.DbPlatform;
051import org.fcrepo.kernel.api.Transaction;
052import org.fcrepo.kernel.api.exception.RepositoryRuntimeException;
053import org.fcrepo.kernel.api.identifiers.FedoraId;
054import org.fcrepo.kernel.api.models.ResourceFactory;
055import org.fcrepo.kernel.api.models.ResourceHeaders;
056import org.fcrepo.search.api.Condition;
057import org.fcrepo.search.api.InvalidQueryException;
058import org.fcrepo.search.api.PaginationInfo;
059import org.fcrepo.search.api.SearchIndex;
060import org.fcrepo.search.api.SearchParameters;
061import org.fcrepo.search.api.SearchResult;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064import org.springframework.dao.DuplicateKeyException;
065import org.springframework.jdbc.core.RowMapper;
066import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
067import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
068import org.springframework.stereotype.Component;
069import org.springframework.transaction.annotation.Propagation;
070import org.springframework.transaction.annotation.Transactional;
071
072/**
073 * An implementation of the {@link SearchIndex}
074 *
075 * @author dbernstein
076 * @author whikloj
077 */
078@Component("searchIndexImpl")
079public class DbSearchIndexImpl implements SearchIndex {
080    private static final Logger LOGGER = LoggerFactory.getLogger(DbSearchIndexImpl.class);
081
082    private static final String TRANSACTION_ID_COLUMN = "transaction_id";
083    private static final String SIMPLE_SEARCH_TABLE = "simple_search";
084    private static final String SIMPLE_SEARCH_TRANSACTIONS_TABLE = "simple_search_transactions";
085    private static final String SEARCH_RESOURCE_RDF_TYPE_TRANSACTIONS_TABLE = "search_resource_rdf_type_transactions";
086    public static final String SEARCH_RESOURCE_RDF_TYPE_TABLE = "search_resource_rdf_type";
087    public static final String SEARCH_RDF_TYPE_TABLE = "search_rdf_type";
088
089    private static final String FEDORA_ID_COLUMN = "fedora_id";
090    private static final String MODIFIED_COLUMN = "modified";
091    private static final String CREATED_COLUMN = "created";
092    private static final String CONTENT_SIZE_COLUMN = "content_size";
093    private static final String MIME_TYPE_COLUMN = "mime_type";
094    private static final String RESOURCE_ID_COLUMN = "resource_id";
095    public static final String RDF_TYPE_ID_COLUMN = "rdf_type_id";
096    public static final String ID_COLUMN = "id";
097    private static final String OPERATION_COLUMN = "operation";
098    private static final String RDF_TYPE_URI_COLUMN = "rdf_type_uri";
099
100    private static final String FEDORA_ID_PARAM = "fedora_id";
101    private static final String RESOURCE_ID_PARAM = "resource_id";
102    private static final String RDF_TYPE_ID_PARAM = "rdf_type_id";
103    private static final String MODIFIED_PARAM = "modified";
104    private static final String CONTENT_SIZE_PARAM = "content_size";
105    private static final String MIME_TYPE_PARAM = "mime_type";
106    private static final String CREATED_PARAM = "created";
107    public static final String RDF_TYPE_URI_PARAM = "rdf_type_uri";
108    public static final String RESOURCE_SEARCH_ID_PARAM = "resource_search_id";
109
110    public static final String TRANSACTION_ID_PARAM = "transaction_id";
111    private static final String OPERATION_PARAM = "operation";
112
113    private static final String RDF_TYPE_FILTER_SUB_TABLE = ", (SELECT rrt." + RESOURCE_ID_COLUMN + " from " +
114            SEARCH_RESOURCE_RDF_TYPE_TABLE + " rrt, " +
115            SEARCH_RDF_TYPE_TABLE + " rt, " + SIMPLE_SEARCH_TABLE + " s WHERE rrt.rdf_type_id = rt.id and s.id = " +
116            "rrt.resource_id and rt." + RDF_TYPE_URI_COLUMN + " like :" + RDF_TYPE_URI_PARAM +
117            " group by rrt." + RESOURCE_ID_COLUMN + ") r_filter";
118    private static final String RDF_TYPES_SUB_TABLE = ", (SELECT rrt.resource_id,  group_concat_function as rdf_type " +
119            " from " + SEARCH_RESOURCE_RDF_TYPE_TABLE + " rrt, " +
120            "search_rdf_type rt ," + SIMPLE_SEARCH_TABLE + " s " +
121            "WHERE rrt.rdf_type_id = rt.id group by rrt.resource_id) r ";
122
123    private static final String POSTGRES_GROUP_CONCAT_FUNCTION = "STRING_AGG(b.rdf_type_uri, ',')";
124    private static final String DEFAULT_GROUP_CONCAT_FUNCTION = "GROUP_CONCAT(distinct b.rdf_type_uri " +
125            "ORDER BY b.rdf_type_uri ASC SEPARATOR ',')";
126
127    private static final String UPSERT_SIMPLE_SEARCH_TRANSACTION_H2 =
128            "MERGE INTO " + SIMPLE_SEARCH_TRANSACTIONS_TABLE + " (" + MODIFIED_COLUMN + "," + CREATED_COLUMN + ", " +
129                    CONTENT_SIZE_COLUMN + "," + MIME_TYPE_COLUMN + "," +
130                    FEDORA_ID_COLUMN + "," + OPERATION_COLUMN + ", " + TRANSACTION_ID_COLUMN +
131                    ") KEY (" + FEDORA_ID_COLUMN + ", " + TRANSACTION_ID_COLUMN + ") VALUES ( :" + MODIFIED_PARAM +
132                    ", :" + CREATED_PARAM + ", :" + CONTENT_SIZE_PARAM + ", :" + MIME_TYPE_PARAM + "," +
133                    ":" + FEDORA_ID_PARAM + ", :" + OPERATION_PARAM + ", :" + TRANSACTION_ID_PARAM + ")";
134
135    private static final String UPSERT_SIMPLE_SEARCH_H2 =
136            "MERGE INTO " + SIMPLE_SEARCH_TABLE + " (" + MODIFIED_COLUMN + "," + CREATED_COLUMN + ", " +
137                    CONTENT_SIZE_COLUMN + "," + MIME_TYPE_COLUMN + "," +
138                    FEDORA_ID_COLUMN + ") KEY (" + FEDORA_ID_COLUMN + ") VALUES ( :" + MODIFIED_PARAM +
139                    ", :" + CREATED_PARAM + ", :" + CONTENT_SIZE_PARAM + ", :" + MIME_TYPE_PARAM + "," +
140                    ":" + FEDORA_ID_PARAM + ")";
141
142    private static final String UPSERT_SIMPLE_SEARCH_TRANSACTION_MYSQL_MARIA =
143            "INSERT INTO " + SIMPLE_SEARCH_TRANSACTIONS_TABLE + " (" + MODIFIED_COLUMN + "," + CREATED_COLUMN + ", " +
144                    CONTENT_SIZE_COLUMN + "," + MIME_TYPE_COLUMN + "," +
145                    FEDORA_ID_COLUMN + "," + OPERATION_COLUMN + ", " + TRANSACTION_ID_COLUMN +
146                    ")  VALUES ( :" + MODIFIED_PARAM + ", :" + CREATED_PARAM + ", :" + CONTENT_SIZE_PARAM +
147                    ", :" + MIME_TYPE_PARAM + "," + ":" + FEDORA_ID_PARAM + ", :" + OPERATION_PARAM +
148                    ", :" + TRANSACTION_ID_PARAM + ") ON DUPLICATE KEY " +
149                    "UPDATE " + MODIFIED_COLUMN + " = VALUES(" + MODIFIED_COLUMN + "), " +
150                    CREATED_COLUMN + "= VALUES(" + CREATED_COLUMN + ")," +
151                    CONTENT_SIZE_COLUMN + "= VALUES(" + CONTENT_SIZE_COLUMN + ")," +
152                    MIME_TYPE_COLUMN + "= VALUES(" + MIME_TYPE_COLUMN + ")," +
153                    OPERATION_COLUMN + "= VALUES(" + OPERATION_COLUMN + ")";
154
155    private static final String UPSERT_SIMPLE_SEARCH_MYSQL_MARIA =
156            "INSERT INTO " + SIMPLE_SEARCH_TABLE + " (" + MODIFIED_COLUMN + "," + CREATED_COLUMN + ", " +
157                    CONTENT_SIZE_COLUMN + "," + MIME_TYPE_COLUMN + "," +
158                    FEDORA_ID_COLUMN + ")  VALUES ( :" + MODIFIED_PARAM + ", :" + CREATED_PARAM +
159                    ", :" + CONTENT_SIZE_PARAM + ", :" + MIME_TYPE_PARAM + "," + ":" + FEDORA_ID_PARAM + ") " +
160                    "ON DUPLICATE KEY UPDATE " + MODIFIED_COLUMN + " = VALUES(" + MODIFIED_COLUMN + "), " +
161                    CREATED_COLUMN + "= VALUES(" + CREATED_COLUMN + ")," +
162                    CONTENT_SIZE_COLUMN + "= VALUES(" + CONTENT_SIZE_COLUMN + ")," +
163                    MIME_TYPE_COLUMN + "= VALUES(" + MIME_TYPE_COLUMN + ")";
164
165    private static final String UPSERT_SIMPLE_SEARCH_TRANSACTION_POSTGRESQL =
166            "INSERT INTO " + SIMPLE_SEARCH_TRANSACTIONS_TABLE + " (" + MODIFIED_COLUMN + "," + CREATED_COLUMN + ", " +
167                    CONTENT_SIZE_COLUMN + "," + MIME_TYPE_COLUMN + "," +
168                    FEDORA_ID_COLUMN + "," + OPERATION_COLUMN + ", " + TRANSACTION_ID_COLUMN +
169                    ")  VALUES ( :" + MODIFIED_PARAM +
170                    ", :" + CREATED_PARAM + ", :" + CONTENT_SIZE_PARAM + ", :" + MIME_TYPE_PARAM + "," +
171                    ":" + FEDORA_ID_PARAM + ", :" + OPERATION_PARAM + ", :" + TRANSACTION_ID_PARAM + ") ON CONFLICT " +
172                    "( " + FEDORA_ID_COLUMN + ", " + TRANSACTION_ID_COLUMN + ") " +
173                    "DO UPDATE SET " + MODIFIED_COLUMN + " = EXCLUDED." + MODIFIED_COLUMN + ", " +
174                    CREATED_COLUMN + " = EXCLUDED." + CREATED_COLUMN + ", " +
175                    CONTENT_SIZE_COLUMN + " = EXCLUDED." + CONTENT_SIZE_COLUMN + ", " +
176                    MIME_TYPE_COLUMN + " = EXCLUDED." + MIME_TYPE_COLUMN + ", " +
177                    OPERATION_COLUMN + " = EXCLUDED." + OPERATION_COLUMN;
178
179    private static final String UPSERT_SIMPLE_SEARCH_POSTGRESQL =
180            "INSERT INTO " + SIMPLE_SEARCH_TABLE + " (" + MODIFIED_COLUMN + "," + CREATED_COLUMN + ", " +
181                    CONTENT_SIZE_COLUMN + "," + MIME_TYPE_COLUMN + "," +
182                    FEDORA_ID_COLUMN + ")  VALUES ( :" + MODIFIED_PARAM +
183                    ", :" + CREATED_PARAM + ", :" + CONTENT_SIZE_PARAM + ", :" + MIME_TYPE_PARAM + "," +
184                    ":" + FEDORA_ID_PARAM + ") ON CONFLICT ( " + FEDORA_ID_COLUMN + ") " +
185                    "DO UPDATE SET " + MODIFIED_COLUMN + " = EXCLUDED." + MODIFIED_COLUMN + ", " +
186                    CREATED_COLUMN + " = EXCLUDED." + CREATED_COLUMN + ", " +
187                    CONTENT_SIZE_COLUMN + " = EXCLUDED." + CONTENT_SIZE_COLUMN + ", " +
188                    MIME_TYPE_COLUMN + " = EXCLUDED." + MIME_TYPE_COLUMN;
189
190    private static final String UPSERT_COMMIT_SIMPLE_SEARCH_H2 =
191            "MERGE INTO " + SIMPLE_SEARCH_TABLE +
192                    " (" + MODIFIED_COLUMN + "," + CREATED_COLUMN + ", " + CONTENT_SIZE_COLUMN + "," +
193                    MIME_TYPE_COLUMN + ", " + FEDORA_ID_COLUMN +
194                    ") KEY (" + FEDORA_ID_COLUMN + ") SELECT " + MODIFIED_COLUMN + ", " + CREATED_COLUMN +
195                    ", " + CONTENT_SIZE_COLUMN + "," + MIME_TYPE_COLUMN + ", " + FEDORA_ID_COLUMN + " FROM " +
196                    SIMPLE_SEARCH_TRANSACTIONS_TABLE + " WHERE " + TRANSACTION_ID_COLUMN + "= :" +
197                    TRANSACTION_ID_PARAM + " AND " + OPERATION_COLUMN + "='add'";
198
199    private static final String UPSERT_COMMIT_SIMPLE_SEARCH_MYSQL_MARIA = "INSERT INTO " + SIMPLE_SEARCH_TABLE +
200            " (" + MODIFIED_COLUMN + "," + CREATED_COLUMN + ", " + CONTENT_SIZE_COLUMN + "," +
201            MIME_TYPE_COLUMN + ", " + FEDORA_ID_COLUMN +
202            ") SELECT " + MODIFIED_COLUMN + ", " + CREATED_COLUMN +
203            ", " + CONTENT_SIZE_COLUMN + "," + MIME_TYPE_COLUMN + ", " + FEDORA_ID_COLUMN + " FROM " +
204            SIMPLE_SEARCH_TRANSACTIONS_TABLE + " a WHERE " + TRANSACTION_ID_COLUMN + "= :" +
205            TRANSACTION_ID_PARAM + " AND " + OPERATION_COLUMN + "='add' " +
206            "ON DUPLICATE KEY UPDATE " + MODIFIED_COLUMN + " = a." + MODIFIED_COLUMN + ", " +
207            CREATED_COLUMN + " = a." + CREATED_COLUMN + ", " +
208            CONTENT_SIZE_COLUMN + " = a." + CONTENT_SIZE_COLUMN + ", " +
209            MIME_TYPE_COLUMN + " = a." + MIME_TYPE_COLUMN;
210
211    private static final String UPSERT_COMMIT_SIMPLE_SEARCH_POSTGRESQL = "INSERT INTO " + SIMPLE_SEARCH_TABLE +
212            " (" + MODIFIED_COLUMN + "," + CREATED_COLUMN + ", " + CONTENT_SIZE_COLUMN + "," +
213            MIME_TYPE_COLUMN + ", " + FEDORA_ID_COLUMN +
214            ") SELECT " + MODIFIED_COLUMN + ", " + CREATED_COLUMN +
215            ", " + CONTENT_SIZE_COLUMN + "," + MIME_TYPE_COLUMN + ", " + FEDORA_ID_COLUMN + " FROM " +
216            SIMPLE_SEARCH_TRANSACTIONS_TABLE + " WHERE " + TRANSACTION_ID_COLUMN + "= :" +
217            TRANSACTION_ID_PARAM + " AND " + OPERATION_COLUMN + "='add' ON CONFLICT (" + FEDORA_ID_COLUMN + ") " +
218            "DO UPDATE SET " + MODIFIED_COLUMN + " = EXCLUDED." + MODIFIED_COLUMN + ", " +
219            CREATED_COLUMN + " = EXCLUDED." + CREATED_COLUMN + ", " +
220            CONTENT_SIZE_COLUMN + " = EXCLUDED." + CONTENT_SIZE_COLUMN + ", " +
221            MIME_TYPE_COLUMN + " = EXCLUDED." + MIME_TYPE_COLUMN;
222
223    private static final String COMMIT_RDF_TYPES =
224            "INSERT INTO " + SEARCH_RDF_TYPE_TABLE + " (" + RDF_TYPE_URI_COLUMN + ")" +
225                    " SELECT distinct " + RDF_TYPE_URI_COLUMN + " FROM " + SEARCH_RESOURCE_RDF_TYPE_TRANSACTIONS_TABLE +
226                    " WHERE " + TRANSACTION_ID_COLUMN + "= :" + TRANSACTION_ID_PARAM + " AND " + RDF_TYPE_URI_COLUMN +
227                    " NOT IN (SELECT " + RDF_TYPE_URI_PARAM + " FROM " + SEARCH_RDF_TYPE_TABLE + ")";
228
229    private static final String INSERT_RDF_TYPE =
230            "INSERT INTO " + SEARCH_RDF_TYPE_TABLE + " (" + RDF_TYPE_URI_COLUMN + ")" +
231                    " VALUES (:" + RDF_TYPE_URI_PARAM + ")";
232
233    private static final String INSERT_RDF_TYPE_POSTGRES =
234            "INSERT INTO " + SEARCH_RDF_TYPE_TABLE + " (" + RDF_TYPE_URI_COLUMN + ")" +
235                    " VALUES (:" + RDF_TYPE_URI_PARAM + ")" +
236                    " ON CONFLICT (" + RDF_TYPE_URI_COLUMN + ") DO NOTHING";
237
238    private static final String COMMIT_RDF_TYPE_ASSOCIATIONS =
239            "INSERT INTO " + SEARCH_RESOURCE_RDF_TYPE_TABLE +
240                    " (" + RESOURCE_ID_COLUMN + "," + RDF_TYPE_ID_COLUMN + ")" +
241                    " SELECT a." + ID_COLUMN + ", b." + ID_COLUMN + " FROM " + SIMPLE_SEARCH_TABLE + " a, " +
242                    SEARCH_RDF_TYPE_TABLE + " b, " + SEARCH_RESOURCE_RDF_TYPE_TRANSACTIONS_TABLE + " c WHERE c." +
243                    TRANSACTION_ID_COLUMN + "= :" + TRANSACTION_ID_PARAM + " AND b." + RDF_TYPE_URI_COLUMN +
244                    "= c." + RDF_TYPE_URI_COLUMN + " AND c." + FEDORA_ID_COLUMN + " = a." + FEDORA_ID_COLUMN +
245                    " GROUP BY a." + ID_COLUMN + ", b." + ID_COLUMN;
246
247    private static final String COMMIT_DELETE_RESOURCES_IN_TRANSACTION =
248            "DELETE FROM " + SIMPLE_SEARCH_TABLE + " WHERE " + FEDORA_ID_COLUMN + " IN (SELECT  " + FEDORA_ID_COLUMN +
249                    " FROM " + SIMPLE_SEARCH_TRANSACTIONS_TABLE + " WHERE " + TRANSACTION_ID_COLUMN + " = " +
250                    ":" + TRANSACTION_ID_PARAM + " AND " + OPERATION_COLUMN + " = 'delete')";
251
252    private static final String COMMIT_DELETE_RDF_TYPE_ASSOCIATIONS =
253            "DELETE FROM " + SEARCH_RESOURCE_RDF_TYPE_TABLE + " where " +
254                    RESOURCE_ID_COLUMN + " in (SELECT a." + ID_COLUMN + " FROM " + SIMPLE_SEARCH_TABLE + " a, " +
255                    SIMPLE_SEARCH_TRANSACTIONS_TABLE + " b " +
256                    " WHERE a." + FEDORA_ID_COLUMN + "= b." + FEDORA_ID_COLUMN + " AND b." + TRANSACTION_ID_COLUMN +
257                    "= :" + TRANSACTION_ID_PARAM + ")";
258
259    private static final String DELETE_TRANSACTION =
260            "DELETE FROM " + SIMPLE_SEARCH_TRANSACTIONS_TABLE + " WHERE " + TRANSACTION_ID_COLUMN + " = :" +
261                    TRANSACTION_ID_PARAM;
262
263    private static final String DELETE_RESOURCE_FROM_SEARCH =
264            "DELETE FROM " + SIMPLE_SEARCH_TABLE + " WHERE " + FEDORA_ID_COLUMN + " = :" +
265                    FEDORA_ID_PARAM;
266
267    private static final String DELETE_RDF_TYPE_ASSOCIATIONS_IN_TRANSACTION =
268            "DELETE FROM " + SEARCH_RESOURCE_RDF_TYPE_TRANSACTIONS_TABLE + " WHERE " + TRANSACTION_ID_COLUMN + " = :" +
269                    TRANSACTION_ID_PARAM;
270
271
272    private static final Map<DbPlatform, String> DIRECT_UPSERT_MAPPING = Map.of(
273            DbPlatform.H2, UPSERT_SIMPLE_SEARCH_H2,
274            DbPlatform.MYSQL, UPSERT_SIMPLE_SEARCH_MYSQL_MARIA,
275            DbPlatform.MARIADB, UPSERT_SIMPLE_SEARCH_MYSQL_MARIA,
276            DbPlatform.POSTGRESQL, UPSERT_SIMPLE_SEARCH_POSTGRESQL
277    );
278
279    private static final Map<DbPlatform, String> TRANSACTION_UPSERT_MAPPING = Map.of(
280            DbPlatform.H2, UPSERT_SIMPLE_SEARCH_TRANSACTION_H2,
281            DbPlatform.MYSQL, UPSERT_SIMPLE_SEARCH_TRANSACTION_MYSQL_MARIA,
282            DbPlatform.MARIADB, UPSERT_SIMPLE_SEARCH_TRANSACTION_MYSQL_MARIA,
283            DbPlatform.POSTGRESQL, UPSERT_SIMPLE_SEARCH_TRANSACTION_POSTGRESQL
284    );
285
286    private static final Map<DbPlatform, String> UPSERT_COMMIT_MAPPING = Map.of(
287            DbPlatform.H2, UPSERT_COMMIT_SIMPLE_SEARCH_H2,
288            DbPlatform.MYSQL, UPSERT_COMMIT_SIMPLE_SEARCH_MYSQL_MARIA,
289            DbPlatform.MARIADB, UPSERT_COMMIT_SIMPLE_SEARCH_MYSQL_MARIA,
290            DbPlatform.POSTGRESQL, UPSERT_COMMIT_SIMPLE_SEARCH_POSTGRESQL
291    );
292
293    /*
294     * Insert an association between a RDF type and a resource.
295     */
296    private static final String INSERT_RDF_TYPE_ASSOC_IN_TRANSACTION = "INSERT INTO " +
297            SEARCH_RESOURCE_RDF_TYPE_TRANSACTIONS_TABLE + " (" + FEDORA_ID_COLUMN + ", " + RDF_TYPE_URI_COLUMN + ", " +
298            TRANSACTION_ID_COLUMN + ") VALUES (:" + FEDORA_ID_PARAM + ", :" + RDF_TYPE_URI_PARAM + ", :" +
299            TRANSACTION_ID_PARAM + ")";
300
301    private static final String SELECT_RESOURCE_SEARCH_ID = "SELECT " + ID_COLUMN + " FROM " + SIMPLE_SEARCH_TABLE +
302            " WHERE " + FEDORA_ID_COLUMN + " = :" + FEDORA_ID_PARAM;
303
304    private static final String SELECT_RDF_TYPE_ID = "SELECT " + ID_COLUMN + " FROM " + SEARCH_RDF_TYPE_TABLE +
305            " WHERE " + RDF_TYPE_URI_COLUMN + "= :" + RDF_TYPE_URI_PARAM;
306
307    private static final String INSERT_RDF_TYPE_ASSOC = "INSERT INTO " + SEARCH_RESOURCE_RDF_TYPE_TABLE +
308            " (" + RESOURCE_ID_COLUMN + ", " + RDF_TYPE_ID_COLUMN + ")" +
309            " VALUES (:" + RESOURCE_SEARCH_ID_PARAM + ", :" + RDF_TYPE_ID_PARAM + ")";
310
311    private static final String DELETE_RESOURCE_TYPE_ASSOCIATIONS_IN_TRANSACTION =
312            "DELETE FROM " + SEARCH_RESOURCE_RDF_TYPE_TRANSACTIONS_TABLE + " WHERE " +
313                    FEDORA_ID_COLUMN + "= :" + FEDORA_ID_PARAM + " AND " + TRANSACTION_ID_COLUMN + "= :" +
314                    TRANSACTION_ID_PARAM;
315
316    private static final String DELETE_RDF_TYPE_ASSOCIATIONS =
317            "DELETE FROM " + SEARCH_RESOURCE_RDF_TYPE_TABLE + " WHERE " + RESOURCE_ID_COLUMN +
318                    " = (SELECT " + ID_COLUMN + " FROM " + SIMPLE_SEARCH_TABLE + " WHERE " +
319                    FEDORA_ID_COLUMN + " = :" + FEDORA_ID_PARAM + ")";
320
321    private static final List<String> COUNT_QUERY_COLUMNS = Arrays.asList("count(0) as count");
322
323    @Inject
324    private DataSource dataSource;
325
326    private NamedParameterJdbcTemplate jdbcTemplate;
327
328    @Inject
329    private ResourceFactory resourceFactory;
330
331    private DbPlatform dbPlatForm;
332
333    private final Map<URI, Long> rdfTypeIdCache;
334
335    /**
336     * Setup database table and connection
337     */
338    @PostConstruct
339    public void setup() {
340        this.dbPlatForm = DbPlatform.fromDataSource(this.dataSource);
341        this.jdbcTemplate = new NamedParameterJdbcTemplate(this.dataSource);
342    }
343
344    public DbSearchIndexImpl() {
345        this.rdfTypeIdCache = new ConcurrentHashMap<>();
346    }
347
348    @Override
349    public SearchResult doSearch(final SearchParameters parameters) throws InvalidQueryException {
350        //translate parameters into a SQL query
351        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
352        final var fields = parameters.getFields().stream().map(Condition.Field::toString).collect(toList());
353        final var selectQuery = createSearchQuery(parameters, parameterSource, fields, false);
354        final RowMapper<Map<String, Object>> rowMapper = createRowMapper(fields);
355
356        Integer totalResults = -1;
357        if (parameters.isIncludeTotalResultCount()) {
358            final var countQuery = createSearchQuery(parameters, parameterSource, Collections.emptyList(), true);
359            LOGGER.debug("countQuery={}, parameterSource={}", countQuery, parameterSource);
360            totalResults = jdbcTemplate.queryForObject(countQuery.toString(), parameterSource, Integer.class);
361        }
362
363        final var selectQueryStr = selectQuery.toString();
364        LOGGER.debug("selectQueryStr={}, parameterSource={}", selectQueryStr, parameterSource);
365
366        final List<Map<String, Object>> items = jdbcTemplate.query(selectQueryStr, parameterSource, rowMapper);
367        final var pagination = new PaginationInfo(parameters.getMaxResults(), parameters.getOffset(),
368                (totalResults != null ? totalResults : 0));
369        LOGGER.debug("Search query with parameters: {} - {}", selectQuery.toString(), parameters);
370        return new SearchResult(items, pagination);
371    }
372
373    private RowMapper<Map<String, Object>> createRowMapper(final List<String> fields) {
374        return new RowMapper<Map<String, Object>>() {
375            @Override
376            public Map<String, Object> mapRow(final ResultSet rs, final int rowNum) throws SQLException {
377                final Map<String, Object> map = new HashMap<>();
378                for (final String fieldStr : fields) {
379                    var value = rs.getObject(fieldStr);
380                    if (value instanceof Timestamp) {
381                        //format as iso instant if timestamp
382                        value = ISO_INSTANT.format(Instant.ofEpochMilli(((Timestamp) value).getTime()));
383                    } else if (fieldStr.equals(RDF_TYPE.toString())) {
384                        //convert the comma-separate string to an array for rdf_type
385                        value = value.toString().split(",");
386                    }
387                    map.put(fieldStr, value);
388                }
389                return map;
390            }
391        };
392    }
393
394    private StringBuilder createSearchQuery(final SearchParameters parameters,
395                                            final MapSqlParameterSource parameterSource,
396                                            final List<String> selectedFields, final boolean isCountQuery)
397            throws InvalidQueryException {
398
399        final var queryFields = new ArrayList<>(selectedFields);
400        final var fedoraIdStr = FEDORA_ID.toString();
401
402        if (!isCountQuery) {
403            if (!queryFields.contains(fedoraIdStr)) {
404                queryFields.add(0,fedoraIdStr);
405            }
406            queryFields.add(0,"id");
407        } else {
408            queryFields.add("count(0)");
409        }
410
411        final var whereClauses = new ArrayList<String>();
412        final var conditions = parameters.getConditions();
413        final var fields = new ArrayList<String>(queryFields);
414        final var rdfTypeConditionValue =
415                conditions.stream().filter(c -> c.getField().equals(RDF_TYPE)).findFirst().orElse(null);
416        final var returnRdfType = fields.stream().anyMatch(x -> x.equals(RDF_TYPE.toString()));
417        final var returnFields = fields.stream().filter(x -> !x.equals(RDF_TYPE.toString())).collect(toList());
418
419        final var sql = new StringBuilder("")
420                .append("SELECT ")
421                .append(String.join(",", returnFields));
422        sql.append(" FROM ")
423                .append(SIMPLE_SEARCH_TABLE).append(" s ");
424
425        if (rdfTypeConditionValue != null) {
426            final var rdfTypeOperator = rdfTypeConditionValue.getObject().contains("*") ? " LIKE " : " = ";
427            sql.append(", (SELECT ").append(RESOURCE_ID_COLUMN).append(" FROM ")
428                    .append(SEARCH_RESOURCE_RDF_TYPE_TABLE).append(" WHERE ")
429                    .append(RDF_TYPE_ID_COLUMN).append(" IN (").append("SELECT ID FROM ").append(SEARCH_RDF_TYPE_TABLE)
430                    .append(" WHERE ").append(RDF_TYPE_URI_COLUMN).append(rdfTypeOperator)
431                    .append(":").append(RDF_TYPE_URI_PARAM).append(")) rdf_type_filter ");
432            whereClauses.add("rdf_type_filter.resource_id = s.id");
433            addRdfTypeParam(parameterSource, conditions);
434        }
435
436        for (int i = 0; i < conditions.size(); i++) {
437            addWhereClause(i, parameterSource, whereClauses, conditions.get(i));
438        }
439
440        if (!whereClauses.isEmpty()) {
441            sql.append(" WHERE ");
442            for (final var it = whereClauses.iterator(); it.hasNext(); ) {
443                sql.append(it.next());
444                if (it.hasNext()) {
445                    sql.append(" AND ");
446                }
447            }
448        }
449
450        if (isCountQuery) {
451            return sql;
452        }
453
454        if (parameters.getOrderBy() != null) {
455            //add order by limit and offset to selectquery.
456            sql.append(" ORDER BY ").append(parameters.getOrderBy()).append(" ").append(parameters.getOrder());
457        }
458
459        sql.append(" LIMIT :limit OFFSET :offset");
460        parameterSource.addValue("limit", parameters.getMaxResults());
461        parameterSource.addValue("offset", parameters.getOffset());
462
463        if (!returnRdfType) {
464            return sql;
465        } else {
466            final var rdfTypeWrapperSql = new StringBuilder();
467            rdfTypeWrapperSql.append("SELECT a.*, ")
468                    .append(isPostgres() ? POSTGRES_GROUP_CONCAT_FUNCTION : DEFAULT_GROUP_CONCAT_FUNCTION)
469                    .append(" as rdf_type")
470                    .append(" FROM ")
471                    .append("(").append(sql).append(") a, ")
472                    .append("(SELECT rrt.resource_id , rt.rdf_type_uri FROM search_resource_rdf_type rrt, " +
473                            "search_rdf_type rt WHERE  rrt.rdf_type_id = rt.id) b ")
474                    .append("WHERE a.id = b.resource_id GROUP BY ").append(String.join(",", returnFields));
475
476            if (parameters.getOrderBy() != null) {
477                //add order by limit and offset to selectquery.
478                rdfTypeWrapperSql.append(" ORDER BY ").append(parameters.getOrderBy()).append(" ")
479                        .append(parameters.getOrder());
480            }
481
482            return rdfTypeWrapperSql;
483        }
484    }
485
486    private void addRdfTypeParam(final MapSqlParameterSource parameterSource, final List<Condition> conditions) {
487        var rdfTypeUriParamValue = "*";
488        for (final Condition condition : conditions) {
489            if (condition.getField().equals(RDF_TYPE)) {
490                rdfTypeUriParamValue = condition.getObject();
491                break;
492            }
493        }
494        parameterSource.addValue(RDF_TYPE_URI_PARAM, convertToSqlLikeWildcard(rdfTypeUriParamValue));
495    }
496
497    private void addWhereClause(final int paramCount, final MapSqlParameterSource parameterSource,
498                                final List<String> whereClauses,
499                                final Condition condition) throws InvalidQueryException {
500        final var field = condition.getField();
501        final var operation = condition.getOperator();
502        var object = condition.getObject();
503        final var paramName = "param" + paramCount;
504        if ((field.equals(FEDORA_ID) || field.equals(MIME_TYPE)) &&
505                condition.getOperator().equals(Condition.Operator.EQ)) {
506            if (!object.equals("*")) {
507                final String whereClause;
508                if (object.contains("*")) {
509                    object = convertToSqlLikeWildcard(object);
510                    if (object.contains("_")) {
511                        object = object.replaceAll("_", "\\\\_");
512                    }
513                    whereClause = field + " like :" + paramName;
514                } else {
515                    whereClause = field + " = :" + paramName;
516                }
517
518                whereClauses.add("s." +  whereClause);
519                parameterSource.addValue(paramName, object);
520            }
521        } else if (field.equals(Condition.Field.CREATED) || field.equals(Condition.Field.MODIFIED)) {
522            //parse date
523            try {
524                final var instant = InstantParser.parse(object);
525                whereClauses.add("s." + field + " " + operation.getStringValue() + " :" + paramName);
526                parameterSource.addValue(paramName, new Timestamp(instant.toEpochMilli()), Types.TIMESTAMP);
527            } catch (final Exception ex) {
528                throw new InvalidQueryException(ex.getMessage());
529            }
530        } else if (field.equals(CONTENT_SIZE)) {
531            try {
532                whereClauses.add(field + " " + operation.getStringValue() +
533                        " :" + paramName);
534                parameterSource.addValue(paramName, Long.parseLong(object), Types.INTEGER);
535            } catch (final Exception ex) {
536                throw new InvalidQueryException(ex.getMessage());
537            }
538        } else if (field.equals(RDF_TYPE) && condition.getOperator().equals(Condition.Operator.EQ) ) {
539           //allowed but no where clause added here.
540        } else {
541            throw new InvalidQueryException("Condition not supported: \"" + condition + "\"");
542        }
543    }
544
545    private String convertToSqlLikeWildcard(final String value) {
546        return value.replace("*", "%");
547    }
548
549    @Override
550    public void addUpdateIndex(final Transaction transaction, final ResourceHeaders resourceHeaders) {
551        final var fedoraId = resourceHeaders.getId();
552        if (fedoraId.isAcl() || fedoraId.isMemento()) {
553            LOGGER.debug("The search index does not include acls or mementos. Ignoring resource {}",
554                    fedoraId.getFullId());
555            return;
556        }
557        LOGGER.debug("Updating search index for {}", fedoraId);
558        transaction.doInTx(() -> {
559            if (!transaction.isShortLived()) {
560                doUpsertWithTransaction(transaction, resourceHeaders, fedoraId);
561            } else {
562                doDirectUpsert(transaction, resourceHeaders, fedoraId);
563            }
564        });
565
566    }
567
568    private void doDirectUpsert(final Transaction transaction, final ResourceHeaders resourceHeaders,
569                                final FedoraId fedoraId) {
570        final var fullId = fedoraId.getFullId();
571        try {
572            final var fedoraResource = resourceFactory.getResource(transaction, fedoraId);
573            doUpsertIntoSimpleSearch(fedoraId, resourceHeaders);
574            final var rdfTypes = new ArrayList<>(Sets.newHashSet(fedoraResource.getTypes()));
575            final var newTypes = insertRdfTypes(rdfTypes);
576            deleteRdfTypeAssociations(fedoraId);
577            insertRdfTypeAssociations(rdfTypes, newTypes, fedoraId);
578        } catch (final Exception e) {
579            throw new RepositoryRuntimeException("Failed add/updated the search index for : " + fullId, e);
580        }
581    }
582
583    /**
584     * Adds the list of RDF types to the db, if they aren't already there, and returns a set of types that were
585     * actually added.
586     *
587     * @param rdfTypes the types to attempt to add
588     * @return the types that were added
589     */
590    private Set<URI> insertRdfTypes(final List<URI> rdfTypes) {
591        final var addTypes = new HashSet<URI>();
592
593        rdfTypes.stream()
594                .filter(rdfType -> !rdfTypeIdCache.containsKey(rdfType))
595                .forEach(rdfType -> {
596                    try {
597                        final var params = new MapSqlParameterSource();
598                        params.addValue(RDF_TYPE_URI_PARAM, rdfType.toString());
599                        if (isPostgres()) {
600                            // weirdly, postgres spoils the entire tx on duplicate keys and must be handled differently
601                            jdbcTemplate.update(INSERT_RDF_TYPE_POSTGRES, params);
602                        } else {
603                            jdbcTemplate.update(INSERT_RDF_TYPE, params);
604                        }
605
606                        addTypes.add(rdfType);
607                    } catch (DuplicateKeyException e) {
608                        // ignore duplicate keys
609                    }
610                });
611
612        return addTypes;
613    }
614
615    private void doUpsertWithTransaction(final Transaction transaction, final ResourceHeaders resourceHeaders,
616                                         final FedoraId fedoraId) {
617        final var fullId = fedoraId.getFullId();
618        try {
619            final var txId = transaction.getId();
620            final var fedoraResource = resourceFactory.getResource(transaction, fedoraId);
621            doUpsertIntoTransactionTables(txId, fedoraId, resourceHeaders, "add");
622            // add rdf type associations to the rdf type association table
623            final var rdfTypes = Sets.newHashSet(fedoraResource.getTypes());
624            insertRdfTypeAssociationsInTransaction(rdfTypes, txId, fedoraId);
625        } catch (final Exception e) {
626            throw new RepositoryRuntimeException("Failed add/updated the search index for : " + fullId, e);
627        }
628    }
629
630    /**
631     * Do the upsert action to the transaction table.
632     *
633     * @param txId            the transaction id
634     * @param fedoraId        the resourceId
635     * @param resourceHeaders the resources headers
636     * @param operation       the operation to perform.
637     */
638    private void doUpsertIntoTransactionTables(final String txId, final FedoraId fedoraId,
639                                               final ResourceHeaders resourceHeaders, final String operation) {
640        var mimetype = "";
641        long contentSize = 0;
642        var modified = Instant.now();
643        var created = Instant.now();
644        if (resourceHeaders != null) {
645            contentSize = resourceHeaders.getContentSize();
646            mimetype = resourceHeaders.getMimeType();
647            modified = resourceHeaders.getLastModifiedDate();
648            created = resourceHeaders.getCreatedDate();
649        }
650
651        final var params = new MapSqlParameterSource();
652        params.addValue(FEDORA_ID_PARAM, fedoraId.getFullId());
653        params.addValue(MIME_TYPE_PARAM, mimetype);
654        params.addValue(CONTENT_SIZE_PARAM, contentSize);
655        params.addValue(CREATED_PARAM, formatInstant(created));
656        params.addValue(MODIFIED_PARAM, formatInstant(modified));
657        params.addValue(OPERATION_PARAM, operation);
658        params.addValue(TRANSACTION_ID_PARAM, txId);
659        jdbcTemplate.update(TRANSACTION_UPSERT_MAPPING.get(dbPlatForm), params);
660    }
661
662    /**
663     * Do direct upsert into simpl search table.
664     *
665     * @param fedoraId        the resourceId
666     * @param resourceHeaders the resources headers
667     */
668    private void doUpsertIntoSimpleSearch(final FedoraId fedoraId,
669                                          final ResourceHeaders resourceHeaders) {
670        var mimetype = "";
671        long contentSize = 0;
672        var modified = Instant.now();
673        var created = Instant.now();
674        if (resourceHeaders != null) {
675            contentSize = resourceHeaders.getContentSize();
676            mimetype = resourceHeaders.getMimeType();
677            modified = resourceHeaders.getLastModifiedDate();
678            created = resourceHeaders.getCreatedDate();
679        }
680
681        final var params = new MapSqlParameterSource();
682        params.addValue(FEDORA_ID_PARAM, fedoraId.getFullId());
683        params.addValue(MIME_TYPE_PARAM, mimetype);
684        params.addValue(CONTENT_SIZE_PARAM, contentSize);
685        params.addValue(CREATED_PARAM, formatInstant(created));
686        params.addValue(MODIFIED_PARAM, formatInstant(modified));
687        jdbcTemplate.update(DIRECT_UPSERT_MAPPING.get(dbPlatForm), params);
688    }
689
690    private Timestamp formatInstant(final Instant instant) {
691        if (instant == null) {
692            return null;
693        }
694        return Timestamp.from(instant.truncatedTo(ChronoUnit.SECONDS));
695    }
696
697    private void insertRdfTypeAssociationsInTransaction(final Set<URI> rdfTypes,
698                                                        final String txId,
699                                                        final FedoraId fedoraId) {
700        //remove and add type associations for the fedora id.
701        final List<MapSqlParameterSource> parameterSourcesList = new ArrayList<>(rdfTypes.size());
702        final var parameterSource = new MapSqlParameterSource();
703        parameterSource.addValue(TRANSACTION_ID_PARAM, txId);
704        parameterSource.addValue(FEDORA_ID_PARAM, fedoraId.getFullId());
705        jdbcTemplate.update(DELETE_RESOURCE_TYPE_ASSOCIATIONS_IN_TRANSACTION, parameterSource);
706
707        for (final var rdfType : rdfTypes) {
708            final var assocParams = new MapSqlParameterSource();
709            assocParams.addValue(TRANSACTION_ID_PARAM, txId);
710            assocParams.addValue(FEDORA_ID_PARAM, fedoraId.getFullId());
711            assocParams.addValue(RDF_TYPE_URI_PARAM, rdfType.toString());
712            parameterSourcesList.add(assocParams);
713        }
714        final MapSqlParameterSource[] psArray = parameterSourcesList.toArray(new MapSqlParameterSource[0]);
715        jdbcTemplate.batchUpdate(INSERT_RDF_TYPE_ASSOC_IN_TRANSACTION, psArray);
716    }
717
718    private void deleteRdfTypeAssociations(final FedoraId fedoraId) {
719        final var deleteParams = new MapSqlParameterSource();
720        deleteParams.addValue(FEDORA_ID_PARAM, fedoraId.getFullId());
721        jdbcTemplate.update(DELETE_RDF_TYPE_ASSOCIATIONS,
722                deleteParams);
723    }
724
725    private void insertRdfTypeAssociations(final List<URI> rdfTypes,
726                                           final Set<URI> newTypes,
727                                           final FedoraId fedoraId) {
728        //add rdf type associations
729
730        final var resourceSearchId = jdbcTemplate.queryForObject(
731                SELECT_RESOURCE_SEARCH_ID,
732                Map.of(FEDORA_ID_PARAM, fedoraId.getFullId()), Long.class);
733
734        final List<MapSqlParameterSource> parameterSourcesList = new ArrayList<>();
735        for (final var rdfType : rdfTypes) {
736            final Long rdfTypeId;
737            if (newTypes.contains(rdfType)) {
738                // The cache MUST NOT be used when the current TX created the record as it will not be committed yet
739                // and it will break other transactions.
740                rdfTypeId = getRdfTypeIdDirect(rdfType);
741            } else {
742                rdfTypeId = getRdfTypeIdCached(rdfType);
743            }
744
745            final var assocParams = new MapSqlParameterSource();
746            assocParams.addValue(RESOURCE_SEARCH_ID_PARAM, resourceSearchId);
747            assocParams.addValue(RDF_TYPE_ID_PARAM, rdfTypeId);
748            parameterSourcesList.add(assocParams);
749        }
750
751        final MapSqlParameterSource[] psArray = parameterSourcesList.toArray(new MapSqlParameterSource[0]);
752        jdbcTemplate.batchUpdate(INSERT_RDF_TYPE_ASSOC, psArray);
753    }
754
755    private Long getRdfTypeIdCached(final URI rdfType) {
756        return rdfTypeIdCache.computeIfAbsent(rdfType, this::getRdfTypeIdDirect);
757    }
758
759    private Long getRdfTypeIdDirect(final URI rdfType) {
760        return jdbcTemplate.queryForObject(
761                SELECT_RDF_TYPE_ID,
762                Map.of(RDF_TYPE_URI_PARAM, rdfType.toString()), Long.class);
763    }
764
765    @Override
766    public void removeFromIndex(final Transaction transaction, final FedoraId fedoraId) {
767        transaction.doInTx(() -> {
768            if (!transaction.isShortLived()) {
769                try {
770                    doUpsertIntoTransactionTables(transaction.getId(), fedoraId, null, "delete");
771                } catch (final Exception e) {
772                    throw new RepositoryRuntimeException("Failed to remove " + fedoraId + " from search index", e);
773                }
774            } else {
775                doDirectRemove(fedoraId);
776            }
777        });
778    }
779
780    private void doDirectRemove(final FedoraId fedoraId) {
781        deleteRdfTypeAssociations(fedoraId);
782        deleteResource(fedoraId);
783    }
784
785    private void deleteResource(final FedoraId fedoraId) {
786        final var params = new MapSqlParameterSource();
787        params.addValue(FEDORA_ID_PARAM, fedoraId.getFullId());
788        jdbcTemplate.update(DELETE_RESOURCE_FROM_SEARCH, params);
789    }
790
791    @Override
792    public void reset() {
793        rdfTypeIdCache.clear();
794
795        try (final var conn = this.dataSource.getConnection();
796             final var statement = conn.createStatement()) {
797            for (final var sql : toggleForeignKeyChecks(false)) {
798                statement.addBatch(sql);
799            }
800            statement.addBatch(truncateTable(SEARCH_RESOURCE_RDF_TYPE_TABLE));
801            statement.addBatch(truncateTable(SEARCH_RDF_TYPE_TABLE));
802            statement.addBatch(truncateTable(SIMPLE_SEARCH_TABLE));
803            statement.addBatch(truncateTable(SEARCH_RESOURCE_RDF_TYPE_TRANSACTIONS_TABLE));
804            statement.addBatch(truncateTable(SIMPLE_SEARCH_TRANSACTIONS_TABLE));
805            for (final var sql : toggleForeignKeyChecks(true)) {
806                statement.addBatch(sql);
807            }
808            statement.executeBatch();
809        } catch (final SQLException e) {
810            throw new RepositoryRuntimeException("Failed to truncate search index tables", e);
811        }
812    }
813
814    @Override
815    public void commitTransaction(final Transaction tx) {
816        if (!tx.isShortLived()) {
817            tx.ensureCommitting();
818            final var txId = tx.getId();
819            try {
820                final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
821                parameterSource.addValue(TRANSACTION_ID_PARAM, txId);
822                final int deletedAssociations = jdbcTemplate.update(COMMIT_DELETE_RDF_TYPE_ASSOCIATIONS,
823                        parameterSource);
824                final int deletedResources = jdbcTemplate.update(COMMIT_DELETE_RESOURCES_IN_TRANSACTION,
825                        parameterSource);
826                final int addedRdfTypes = jdbcTemplate.update(COMMIT_RDF_TYPES, parameterSource);
827                final int addedResources = jdbcTemplate.update(UPSERT_COMMIT_MAPPING.get(dbPlatForm),
828                        parameterSource);
829                final int addRdfTypeAssociations = jdbcTemplate.update(COMMIT_RDF_TYPE_ASSOCIATIONS, parameterSource);
830                cleanupTransaction(txId);
831                LOGGER.debug("Commit of tx {} complete with {} resource adds, {} resource associations adds, " +
832                                "{} rdf types adds{},  resource deletes, {} resource/rdf type associations deletes",
833                        txId, addedResources, addRdfTypeAssociations, addedRdfTypes, deletedResources,
834                        deletedAssociations);
835            } catch (final Exception e) {
836                LOGGER.warn("Unable to commit search index transaction {}: {}", txId, e.getMessage());
837                throw new RepositoryRuntimeException("Unable to commit search index transaction", e);
838            }
839        }
840    }
841
842    @Transactional(propagation = Propagation.NOT_SUPPORTED)
843    @Override
844    public void rollbackTransaction(final Transaction tx) {
845        if (!tx.isShortLived()) {
846            cleanupTransaction(tx.getId());
847        }
848    }
849
850    private void cleanupTransaction(final String txId) {
851        final MapSqlParameterSource parameterSource = new MapSqlParameterSource();
852        parameterSource.addValue(TRANSACTION_ID_PARAM, txId);
853        jdbcTemplate.update(DELETE_TRANSACTION, parameterSource);
854        jdbcTemplate.update(DELETE_RDF_TYPE_ASSOCIATIONS_IN_TRANSACTION, parameterSource);
855        LOGGER.debug("Transaction data has been removed from the search transaction tables for txId={} ", txId);
856    }
857
858    private List<String> toggleForeignKeyChecks(final boolean enable) {
859
860        if (isPostgres()) {
861            return Collections.emptyList();
862        } else {
863            return List.of("SET FOREIGN_KEY_CHECKS = " + (enable ? 1 : 0) + ";");
864        }
865    }
866
867    private boolean isPostgres() {
868        return dbPlatForm.equals(POSTGRESQL);
869    }
870
871    private String truncateTable(final String tableName) {
872        final var addCascade = isPostgres();
873        return "TRUNCATE TABLE " + tableName + (addCascade ? " CASCADE" : "") + ";";
874    }
875
876}