implemented SystemEvent service - data model and APIs

This commit is contained in:
Dmitry Toptygin
2020-06-02 19:42:27 -04:00
parent 4085fc941f
commit 7d35435a96
49 changed files with 1589 additions and 1568 deletions

View File

@@ -8,7 +8,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -18,12 +17,10 @@ import com.telecominfraproject.wlan.core.model.pagination.ColumnAndSort;
import com.telecominfraproject.wlan.core.model.pagination.PaginationContext;
import com.telecominfraproject.wlan.core.model.pagination.PaginationResponse;
import com.telecominfraproject.wlan.core.model.pagination.SortOrder;
import com.telecominfraproject.wlan.datastore.exceptions.DsConcurrentModificationException;
import com.telecominfraproject.wlan.datastore.exceptions.DsEntityNotFoundException;
import com.telecominfraproject.wlan.datastore.inmemory.BaseInMemoryDatastore;
import com.telecominfraproject.wlan.systemevent.datastore.SystemEventDatastore;
import com.telecominfraproject.wlan.systemevent.models.SystemEventContainer;
import com.telecominfraproject.wlan.systemevent.models.SystemEventRecord;
import com.telecominfraproject.wlan.systemevent.models.SystemEventRecordKey;
/**
* @author dtoptygin
@@ -34,125 +31,74 @@ public class SystemEventDatastoreInMemory extends BaseInMemoryDatastore implemen
private static final Logger LOG = LoggerFactory.getLogger(SystemEventDatastoreInMemory.class);
private static final Map<Long, SystemEventContainer> idToSystemEventRecordMap = new ConcurrentHashMap<Long, SystemEventContainer>();
private static final AtomicLong systemEventRecordIdCounter = new AtomicLong();
@Override
public SystemEventContainer create(SystemEventContainer systemEventRecord) {
SystemEventContainer systemEventRecordCopy = systemEventRecord.clone();
long id = systemEventRecordIdCounter.incrementAndGet();
systemEventRecordCopy.setId(id);
systemEventRecordCopy.setCreatedTimestamp(System.currentTimeMillis());
systemEventRecordCopy.setLastModifiedTimestamp(systemEventRecordCopy.getCreatedTimestamp());
idToSystemEventRecordMap.put(id, systemEventRecordCopy);
LOG.debug("Stored SystemEventContainer {}", systemEventRecordCopy);
return systemEventRecordCopy.clone();
}
@Override
public SystemEventContainer get(long systemEventRecordId) {
LOG.debug("Looking up SystemEventContainer for id {}", systemEventRecordId);
SystemEventContainer systemEventRecord = idToSystemEventRecordMap.get(systemEventRecordId);
if(systemEventRecord==null){
LOG.debug("Cannot find SystemEventContainer for id {}", systemEventRecordId);
throw new DsEntityNotFoundException("Cannot find SystemEventContainer for id " + systemEventRecordId);
} else {
LOG.debug("Found SystemEventContainer {}", systemEventRecord);
}
return systemEventRecord.clone();
}
@Override
public SystemEventContainer getOrNull(long systemEventRecordId) {
LOG.debug("Looking up SystemEventContainer for id {}", systemEventRecordId);
SystemEventContainer systemEventRecord = idToSystemEventRecordMap.get(systemEventRecordId);
if(systemEventRecord==null){
LOG.debug("Cannot find SystemEventContainer for id {}", systemEventRecordId);
return null;
} else {
LOG.debug("Found SystemEventContainer {}", systemEventRecord);
}
return systemEventRecord.clone();
}
private static final Map<SystemEventRecordKey, SystemEventRecord> idToSystemEventRecordMap = new ConcurrentHashMap<SystemEventRecordKey, SystemEventRecord>();
@Override
public SystemEventContainer update(SystemEventContainer systemEventRecord) {
SystemEventContainer existingSystemEventRecord = get(systemEventRecord.getId());
public void create(SystemEventRecord systemEvent) {
if(existingSystemEventRecord.getLastModifiedTimestamp()!=systemEventRecord.getLastModifiedTimestamp()){
LOG.debug("Concurrent modification detected for SystemEventContainer with id {} expected version is {} but version in db was {}",
systemEventRecord.getId(),
systemEventRecord.getLastModifiedTimestamp(),
existingSystemEventRecord.getLastModifiedTimestamp()
);
throw new DsConcurrentModificationException("Concurrent modification detected for SystemEventContainer with id " + systemEventRecord.getId()
+" expected version is " + systemEventRecord.getLastModifiedTimestamp()
+" but version in db was " + existingSystemEventRecord.getLastModifiedTimestamp()
);
SystemEventRecord systemEventCopy = systemEvent.clone();
if(systemEventCopy.getEventTimestamp()==0) {
systemEventCopy.setEventTimestamp(System.currentTimeMillis());
}
SystemEventContainer systemEventRecordCopy = systemEventRecord.clone();
systemEventRecordCopy.setLastModifiedTimestamp(getNewLastModTs(systemEventRecord.getLastModifiedTimestamp()));
idToSystemEventRecordMap.put(systemEventRecordCopy.getId(), systemEventRecordCopy);
idToSystemEventRecordMap.put(new SystemEventRecordKey(systemEventCopy), systemEventCopy);
LOG.debug("Updated SystemEventContainer {}", systemEventRecordCopy);
LOG.debug("Stored SystemEventRecord {}", systemEventCopy);
return systemEventRecordCopy.clone();
}
@Override
public SystemEventContainer delete(long systemEventRecordId) {
SystemEventContainer systemEventRecord = get(systemEventRecordId);
idToSystemEventRecordMap.remove(systemEventRecord.getId());
LOG.debug("Deleted SystemEventContainer {}", systemEventRecord);
return systemEventRecord.clone();
}
@Override
public List<SystemEventContainer> get(Set<Long> systemEventRecordIdSet) {
List<SystemEventContainer> ret = new ArrayList<>();
if(systemEventRecordIdSet!=null && !systemEventRecordIdSet.isEmpty()) {
idToSystemEventRecordMap.forEach(
(id, c) -> {
if(systemEventRecordIdSet.contains(id)) {
ret.add(c.clone());
} }
);
public void create(List<SystemEventRecord> systemEvents) {
if(systemEvents==null || systemEvents.isEmpty()) {
return;
}
LOG.debug("Found SystemEventRecords by ids {}", ret);
return ret;
systemEvents.forEach(m -> create(m));
}
@Override
public void delete(int customerId, long equipmentId, long createdBeforeTimestamp) {
List<SystemEventRecordKey> keysToRemove = new ArrayList<>();
idToSystemEventRecordMap.keySet().forEach( k -> {
if(k.getCustomerId() == customerId && k.getEquipmentId() == equipmentId && k.getEventTimestamp() < createdBeforeTimestamp) {
keysToRemove.add(k);
}
});
keysToRemove.forEach(k -> idToSystemEventRecordMap.remove(k) );
LOG.debug("Deleted SystemEventRecord s for customer {} equipment {} createdBefore {}", customerId, equipmentId, createdBeforeTimestamp);
}
@Override
public PaginationResponse<SystemEventContainer> getForCustomer(int customerId,
final List<ColumnAndSort> sortBy, PaginationContext<SystemEventContainer> context) {
public void delete(long createdBeforeTimestamp) {
List<SystemEventRecordKey> keysToRemove = new ArrayList<>();
idToSystemEventRecordMap.keySet().forEach( k -> {
if(k.getEventTimestamp() < createdBeforeTimestamp) {
keysToRemove.add(k);
}
});
keysToRemove.forEach(k -> idToSystemEventRecordMap.remove(k) );
LOG.debug("Deleted {} SystemEventRecords createdBefore {}", keysToRemove.size(), createdBeforeTimestamp);
}
@Override
public PaginationResponse<SystemEventRecord> getForCustomer(long fromTime, long toTime, int customerId,
Set<Long> equipmentIds, Set<String> dataTypes,
List<ColumnAndSort> sortBy, PaginationContext<SystemEventRecord> context) {
if(context == null) {
context = new PaginationContext<>();
}
PaginationResponse<SystemEventContainer> ret = new PaginationResponse<>();
PaginationResponse<SystemEventRecord> ret = new PaginationResponse<>();
ret.setContext(context.clone());
if (ret.getContext().isLastPage()) {
@@ -160,34 +106,40 @@ public class SystemEventDatastoreInMemory extends BaseInMemoryDatastore implemen
return ret;
}
List<SystemEventContainer> items = new LinkedList<>();
List<SystemEventRecord> items = new LinkedList<>();
// apply filters and build the full result list first - inefficient, but ok for testing
for (SystemEventContainer mdl : idToSystemEventRecordMap.values()) {
for (SystemEventRecord mdl : idToSystemEventRecordMap.values()) {
if (mdl.getCustomerId() != customerId) {
continue;
if (mdl.getCustomerId() == customerId &&
(equipmentIds==null || equipmentIds.isEmpty() || equipmentIds.contains(mdl.getEquipmentId())) &&
(dataTypes==null || dataTypes.isEmpty() || dataTypes.contains(mdl.getDataType())) &&
mdl.getEventTimestamp() >= fromTime &&
mdl.getEventTimestamp() <= toTime
) {
items.add(mdl);
}
items.add(mdl);
}
// apply sortBy columns
Collections.sort(items, new Comparator<SystemEventContainer>() {
Collections.sort(items, new Comparator<SystemEventRecord>() {
@Override
public int compare(SystemEventContainer o1, SystemEventContainer o2) {
public int compare(SystemEventRecord o1, SystemEventRecord o2) {
if (sortBy == null || sortBy.isEmpty()) {
// sort ascending by id by default
return Long.compare(o1.getId(), o2.getId());
// sort ascending by event time stamp by default
return Long.compare(o1.getEventTimestamp(), o2.getEventTimestamp());
} else {
int cmp;
for (ColumnAndSort column : sortBy) {
switch (column.getColumnName()) {
case "id":
cmp = Long.compare(o1.getId(), o2.getId());
case "eventTimestamp":
cmp = Long.compare(o1.getEventTimestamp(), o2.getEventTimestamp());
break;
case "sampleStr":
cmp = o1.getSampleStr().compareTo(o2.getSampleStr());
case "equipmentId":
cmp = Long.compare(o1.getEquipmentId(), o2.getEquipmentId());
break;
case "dataType":
cmp = o1.getDataType().compareTo(o2.getDataType());
break;
default:
// skip unknown column
@@ -208,9 +160,10 @@ public class SystemEventDatastoreInMemory extends BaseInMemoryDatastore implemen
// find first item to add
int fromIndex = 0;
if (context.getStartAfterItem() != null) {
for (SystemEventContainer mdl : items) {
SystemEventRecordKey startAfterKey = new SystemEventRecordKey(context.getStartAfterItem());
for (SystemEventRecord mdl : items) {
fromIndex++;
if (mdl.getId() == context.getStartAfterItem().getId()) {
if (new SystemEventRecordKey(mdl).equals(startAfterKey)) {
break;
}
}
@@ -223,8 +176,8 @@ public class SystemEventDatastoreInMemory extends BaseInMemoryDatastore implemen
}
// copy page items into result
List<SystemEventContainer> selectedItems = new ArrayList<>(context.getMaxItemsPerPage());
for (SystemEventContainer mdl : items.subList(fromIndex, toIndexExclusive)) {
List<SystemEventRecord> selectedItems = new ArrayList<>(context.getMaxItemsPerPage());
for (SystemEventRecord mdl : items.subList(fromIndex, toIndexExclusive)) {
selectedItems.add(mdl.clone());
}
@@ -235,11 +188,18 @@ public class SystemEventDatastoreInMemory extends BaseInMemoryDatastore implemen
if(ret.getContext().getStartAfterItem()!=null) {
//this datastore is only interested in the last item's id, so we'll clear all other fields on the startAfterItem in the pagination context
SystemEventContainer newStartAfterItem = new SystemEventContainer();
newStartAfterItem.setId(ret.getContext().getStartAfterItem().getId());
SystemEventRecord newStartAfterItem = new SystemEventRecord();
SystemEventRecord oldStartAfterItem = ret.getContext().getStartAfterItem();
newStartAfterItem.setCustomerId(oldStartAfterItem.getCustomerId());
newStartAfterItem.setEquipmentId(oldStartAfterItem.getEquipmentId());
newStartAfterItem.setDataType(oldStartAfterItem.getDataType());
newStartAfterItem.setEventTimestamp(oldStartAfterItem.getEventTimestamp());
ret.getContext().setStartAfterItem(newStartAfterItem);
}
return ret;
}
}