You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

360 lines
16 KiB
Java

package com.ipsos.datasource.service;
5 years ago
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.ipsos.base.domain.*;
import com.ipsos.datasource.constants.DatasourceTypes;
import com.ipsos.datasource.dto.*;
import com.ipsos.datasource.provider.DatasourceProvider;
import com.ipsos.datasource.provider.ProviderFactory;
import com.ipsos.dto.DatasourceDTO;
import com.ipsos.dto.dataset.DataTableInfoDTO;
import com.ipsos.base.domain.*;
import com.ipsos.base.mapper.*;
import com.ipsos.base.domain.*;
import com.ipsos.base.mapper.DatasetTableMapper;
import com.ipsos.base.mapper.DatasourceMapper;
import com.ipsos.base.mapper.ext.ExtDataSourceMapper;
import com.ipsos.base.mapper.ext.query.GridExample;
import com.ipsos.commons.exception.DEException;
import com.ipsos.commons.model.AuthURD;
import com.ipsos.commons.utils.AuthUtils;
import com.ipsos.commons.utils.CommonThreadPool;
import com.ipsos.commons.utils.LogUtil;
import com.ipsos.controller.ResultHolder;
import com.ipsos.controller.request.DatasourceUnionRequest;
import com.ipsos.controller.sys.base.BaseGridRequest;
import com.ipsos.controller.sys.base.ConditionEntity;
import com.ipsos.datasource.dto.*;
import com.ipsos.datasource.dto.*;
import com.ipsos.datasource.request.DatasourceRequest;
import com.ipsos.exception.DataEaseException;
import com.ipsos.i18n.Translator;
import com.ipsos.provider.QueryProvider;
import com.ipsos.service.dataset.DataSetGroupService;
import com.ipsos.service.message.DeMsgutil;
5 years ago
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@Service
@Transactional(rollbackFor = Exception.class)
public class DatasourceService {
@Resource
private DatasourceMapper datasourceMapper;
@Resource
private ExtDataSourceMapper extDataSourceMapper;
@Resource
private DatasetTableMapper datasetTableMapper;
@Resource
private DataSetGroupService dataSetGroupService;
@Resource
private CommonThreadPool commonThreadPool;
public Datasource addDatasource(Datasource datasource) {
checkName(datasource);
long currentTimeMillis = System.currentTimeMillis();
datasource.setId(UUID.randomUUID().toString());
datasource.setUpdateTime(currentTimeMillis);
datasource.setCreateTime(currentTimeMillis);
datasource.setCreateBy(String.valueOf(AuthUtils.getUser().getUsername()));
checkAndUpdateDatasourceStatus(datasource);
datasourceMapper.insertSelective(datasource);
handleConnectionPool(datasource, "add");
return datasource;
}
private void handleConnectionPool(Datasource datasource, String type) {
commonThreadPool.addTask(() -> {
try {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
datasourceProvider.handleDatasource(datasourceRequest, type);
LogUtil.info("Succsss to {} datasource connection pool: {}", type, datasource.getName());
} catch (Exception e) {
LogUtil.error("Failed to handle datasource connection pool: " + datasource.getName(), e);
}
});
}
public List<DatasourceDTO> getDatasourceList(DatasourceUnionRequest request) throws Exception {
request.setSort("update_time desc");
List<DatasourceDTO> datasourceDTOS = extDataSourceMapper.queryUnion(request);
datasourceDTOS.forEach(datasourceDTO -> {
DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasourceDTO.getType());
try {
5 years ago
switch (datasourceType) {
case mysql:
case mariadb:
case de_doris:
case ds_doris:
datasourceDTO.setConfiguration(JSONObject.toJSONString(new Gson().fromJson(datasourceDTO.getConfiguration(), MysqlConfiguration.class)));
5 years ago
break;
case sqlServer:
datasourceDTO.setConfiguration(JSONObject.toJSONString(new Gson().fromJson(datasourceDTO.getConfiguration(), SqlServerConfiguration.class)));
5 years ago
break;
case oracle:
datasourceDTO.setConfiguration(JSONObject.toJSONString(new Gson().fromJson(datasourceDTO.getConfiguration(), OracleConfiguration.class)));
5 years ago
break;
case pg:
datasourceDTO.setConfiguration(JSONObject.toJSONString(new Gson().fromJson(datasourceDTO.getConfiguration(), PgConfiguration.class)));
5 years ago
break;
case ck:
datasourceDTO.setConfiguration(JSONObject.toJSONString(new Gson().fromJson(datasourceDTO.getConfiguration(), CHConfiguration.class)));
5 years ago
break;
default:
break;
}
} catch (Exception ignore) {
}
5 years ago
});
return datasourceDTOS;
}
public List<DatasourceDTO> gridQuery(BaseGridRequest request) {
//如果没有查询条件增加一个默认的条件
if (CollectionUtils.isEmpty(request.getConditions())) {
ConditionEntity conditionEntity = new ConditionEntity();
conditionEntity.setField("1");
conditionEntity.setOperator("eq");
conditionEntity.setValue("1");
request.setConditions(Arrays.asList(conditionEntity));
}
GridExample gridExample = request.convertExample();
gridExample.setExtendCondition(String.valueOf(AuthUtils.getUser().getUserId()));
return extDataSourceMapper.query(gridExample);
}
public void deleteDatasource(String datasourceId) throws Exception {
DatasetTableExample example = new DatasetTableExample();
example.createCriteria().andDataSourceIdEqualTo(datasourceId);
List<DatasetTable> datasetTables = datasetTableMapper.selectByExample(example);
if (CollectionUtils.isNotEmpty(datasetTables)) {
DataEaseException.throwException(datasetTables.size() + Translator.get("i18n_datasource_not_allow_delete_msg"));
5 years ago
}
Datasource datasource = datasourceMapper.selectByPrimaryKey(datasourceId);
datasourceMapper.deleteByPrimaryKey(datasourceId);
handleConnectionPool(datasource, "delete");
}
public void updateDatasource(Datasource datasource) {
checkName(datasource);
datasource.setCreateTime(null);
datasource.setUpdateTime(System.currentTimeMillis());
checkAndUpdateDatasourceStatus(datasource);
datasourceMapper.updateByPrimaryKeySelective(datasource);
handleConnectionPool(datasource, "edit");
}
public ResultHolder validate(Datasource datasource) throws Exception {
try {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
datasourceProvider.checkStatus(datasourceRequest);
return ResultHolder.success(datasourceProvider.getTables(datasourceRequest));
} catch (Exception e) {
5 years ago
return ResultHolder.error("Datasource is invalid: " + e.getMessage());
}
}
public ResultHolder validate(String datasourceId) {
Datasource datasource = datasourceMapper.selectByPrimaryKey(datasourceId);
if (datasource == null) {
return ResultHolder.error("Can not find datasource: " + datasourceId);
5 years ago
}
try {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
datasourceProvider.checkStatus(datasourceRequest);
datasource.setStatus("Success");
return ResultHolder.success(datasourceProvider.getTables(datasourceRequest));
} catch (Exception e) {
5 years ago
datasource.setStatus("Error");
return ResultHolder.error("Datasource is invalid: " + e.getMessage());
} finally {
5 years ago
datasourceMapper.updateByPrimaryKey(datasource);
}
}
public List<String> getSchema(Datasource datasource) throws Exception {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
return datasourceProvider.getSchema(datasourceRequest);
}
public ResultHolder getTablesByDS(Datasource datasource) throws Exception {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
datasourceProvider.checkStatus(datasourceRequest);
return ResultHolder.success(datasourceProvider.getTables(datasourceRequest));
}
5 years ago
public List<DBTableDTO> getTables(Datasource datasource) throws Exception {
Datasource ds = datasourceMapper.selectByPrimaryKey(datasource.getId());
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds);
datasourceProvider.checkStatus(datasourceRequest);
List<String> tables = datasourceProvider.getTables(datasourceRequest);
// 获取当前数据源下的db类型数据集
DatasetTableExample datasetTableExample = new DatasetTableExample();
datasetTableExample.createCriteria().andTypeEqualTo("db").andDataSourceIdEqualTo(datasource.getId());
List<DatasetTable> datasetTables = datasetTableMapper.selectByExampleWithBLOBs(datasetTableExample);
List<DBTableDTO> list = new ArrayList<>();
for (String name : tables) {
DBTableDTO dbTableDTO = new DBTableDTO();
dbTableDTO.setDatasourceId(datasource.getId());
dbTableDTO.setName(name);
dbTableDTO.setEnableCheck(true);
dbTableDTO.setDatasetPath(null);
for (DatasetTable datasetTable : datasetTables) {
DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class);
if (StringUtils.equals(name, dataTableInfoDTO.getTable())) {
dbTableDTO.setEnableCheck(false);
List<DatasetGroup> parents = dataSetGroupService.getParents(datasetTable.getSceneId());
StringBuilder stringBuilder = new StringBuilder();
parents.forEach(ele -> {
if (ObjectUtils.isNotEmpty(ele)) {
stringBuilder.append(ele.getName()).append("/");
}
});
stringBuilder.append(datasetTable.getName());
dbTableDTO.setDatasetPath(stringBuilder.toString());
break;
}
}
list.add(dbTableDTO);
}
return list;
}
public Datasource get(String id) {
return datasourceMapper.selectByPrimaryKey(id);
}
public List<TableFiled> getFields(Datasource datasource, String tableName) throws Exception {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
datasourceRequest.setQuery(qp.convertTableToSql(tableName, datasource));
return datasourceProvider.fetchResultField(datasourceRequest);
}
public Map<String, List> getDataByDatasource(Datasource datasource, String tableName, Integer fetchSize) throws Exception {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
datasourceRequest.setQuery(qp.convertTableToSql(tableName, datasource));
datasourceRequest.setFetchSize(fetchSize);
datasourceRequest.setPageable(true);
datasourceRequest.setPreviewData(true);
return datasourceProvider.fetchResultAndField(datasourceRequest);
}
5 years ago
public void initAllDataSourceConnectionPool() {
List<Datasource> datasources = datasourceMapper.selectByExampleWithBLOBs(new DatasourceExample());
datasources.forEach(datasource -> {
try {
handleConnectionPool(datasource, "add");
} catch (Exception e) {
e.printStackTrace();
}
});
}
private void checkName(Datasource datasource) {
DatasourceExample example = new DatasourceExample();
DatasourceExample.Criteria criteria = example.createCriteria();
criteria.andNameEqualTo(datasource.getName());
if (StringUtils.isNotEmpty(datasource.getId())) {
criteria.andIdNotEqualTo(datasource.getId());
}
if (CollectionUtils.isNotEmpty(datasourceMapper.selectByExample(example))) {
DEException.throwException(Translator.get("i18n_ds_name_exists"));
}
}
public void updateDatasourceStatus() {
5 years ago
List<Datasource> datasources = datasourceMapper.selectByExampleWithBLOBs(new DatasourceExample());
datasources.forEach(datasource -> {
// checkAndUpdateDatasourceStatus(datasource);
checkAndUpdateDatasourceStatus(datasource, true);
});
}
private void checkAndUpdateDatasourceStatus(Datasource datasource) {
5 years ago
try {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
datasourceProvider.checkStatus(datasourceRequest);
datasource.setStatus("Success");
} catch (Exception e) {
datasource.setStatus("Error");
}
}
private void checkAndUpdateDatasourceStatus(Datasource datasource, Boolean withMsg) {
5 years ago
try {
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(datasource.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
datasourceProvider.checkStatus(datasourceRequest);
datasource.setStatus("Success");
datasourceMapper.updateByPrimaryKeySelective(datasource);
} catch (Exception e) {
Datasource temp = datasourceMapper.selectByPrimaryKey(datasource.getId());
datasource.setStatus("Error");
if (!StringUtils.equals(temp.getStatus(), "Error")) {
sendWebMsg(datasource);
datasourceMapper.updateByPrimaryKeySelective(datasource);
}
5 years ago
}
}
private void sendWebMsg(Datasource datasource) {
5 years ago
String id = datasource.getId();
AuthURD authURD = AuthUtils.authURDR(id);
Set<Long> userIds = AuthUtils.userIdsByURD(authURD);
Long typeId = 8L;// 代表数据源失效
Gson gson = new Gson();
userIds.forEach(userId -> {
Map<String, Object> param = new HashMap<>();
param.put("id", id);
param.put("name", datasource.getName());
5 years ago
String content = "数据源【" + datasource.getName() + "】无效";
5 years ago
DeMsgutil.sendMsg(userId, typeId, 1L, content, gson.toJson(param));
});
}
}