@@ -257,6 +257,7 @@ public CommonResponse<Object> adhoc(String code, Adhoc configure)
257257 targetConfigure .setUsername (Optional .ofNullable (initializerConfigure .getDataSetConfigure ().getUsername ()));
258258 targetConfigure .setPassword (Optional .ofNullable (initializerConfigure .getDataSetConfigure ().getPassword ()));
259259 targetConfigure .setDatabase (Optional .ofNullable (database ));
260+ targetConfigure .setInjector (injector );
260261 plugin .connect (targetConfigure );
261262 Response response = plugin .execute (sql );
262263 response .setContent (sql );
@@ -337,6 +338,7 @@ private Configure getConfigure(String database)
337338 targetConfigure .setUsername (Optional .ofNullable (initializerConfigure .getDataSetConfigure ().getUsername ()));
338339 targetConfigure .setPassword (Optional .ofNullable (initializerConfigure .getDataSetConfigure ().getPassword ()));
339340 targetConfigure .setDatabase (Optional .ofNullable (database ));
341+ targetConfigure .setInjector (injector );
340342 return targetConfigure ;
341343 }
342344
@@ -551,7 +553,7 @@ private void syncData(DataSetEntity entity, ExecutorService service)
551553 try {
552554 SourceEntity source = entity .getSource ();
553555 Optional <Plugin > pluginOptional = PluginUtils .getPluginByNameAndType (injector , source .getType (), source .getProtocol ());
554- if (! pluginOptional .isPresent ()) {
556+ if (pluginOptional .isEmpty ()) {
555557 throw new IllegalArgumentException (String .format ("Plugin [ %s ] not found" , initializerConfigure .getDataSetConfigure ().getType ()));
556558 }
557559
@@ -575,8 +577,10 @@ private void syncData(DataSetEntity entity, ExecutorService service)
575577 Properties inputProperties = ConfigureUtils .convertProperties (source , environment ,
576578 IConfigurePipelineType .INPUT , entity .getExecutor (), entity .getQuery (), inputFieldBody );
577579 Set <String > inputOptions = ConfigureUtils .convertOptions (source , environment , entity .getExecutor (), IConfigurePipelineType .INPUT );
580+ Configure inputConfigure = source .toConfigure ();
581+ inputConfigure .setInjector (injector );
578582 ExecutorConfigure input = new ExecutorConfigure (source .getType (), inputProperties , inputOptions , RunProtocol .valueOf (source .getProtocol ()),
579- inputPlugin , entity .getQuery (), database , entity .getTableName (), source . toConfigure () , originColumns );
583+ inputPlugin , entity .getQuery (), database , entity .getTableName (), inputConfigure , originColumns );
580584
581585 Plugin outputPlugin = PluginUtils .getPluginByNameAndType (injector , initializerConfigure .getDataSetConfigure ().getType (), PluginType .JDBC .name ()).orElseGet (null );
582586 SourceEntity outputSource = new SourceEntity ();
@@ -776,7 +780,7 @@ private Set<CreatedModel> createdModeProcess(Set<DataSetColumnEntity> targetColu
776780 Optional <DataSetColumnEntity > filterColumn = targetColumns .stream ()
777781 .filter (item -> item .getId ().equals (sourceColumn .getId ()))
778782 .findFirst ();
779- if (! filterColumn .isPresent ()) {
783+ if (filterColumn .isEmpty ()) {
780784 models .add (new CreatedModel (sourceColumn , CreatedMode .CREATE_COLUMN ));
781785 }
782786 else {
@@ -816,7 +820,9 @@ private boolean checkTableExists(DataSetEntity entity)
816820 try {
817821 Plugin plugin = getOutputPlugin ();
818822 SourceEntity source = getOutputSource ();
819- plugin .connect (source .toConfigure ());
823+ Configure configure = source .toConfigure ();
824+ configure .setInjector (injector );
825+ plugin .connect (configure );
820826 String sql = String .format ("SHOW CREATE TABLE `%s`.`%s`" , initializerConfigure .getDataSetConfigure ().getDatabase (), entity .getTableName ());
821827 log .info ("Check table exists for dataset [ {} ] id [ {} ] sql \n {}" , entity .getName (), entity .getId (), sql );
822828 Response response = plugin .execute (sql );
0 commit comments