-
Notifications
You must be signed in to change notification settings - Fork 342
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CDAP-20993] Periodic refresh implementation of source control metadata for SCM sync status #15626
base: develop
Are you sure you want to change the base?
Conversation
edited
- Added Periodic refresh for namespace and repository source control metadata
- Source control metadata is automatically refreshed every 3 hours which is configurable
- There is a way to manually refresh the source control metadata by calling the list APIs
- If the time elapsed since the last refresh is within the buffer time of 10 minutes (this is configurable), then the manual trigger is initiated
- After setting a repo config, manual trigger is initiated. The above check is skipped.
- If the service for that namespace is already running, refresh operation is skipped.
eb92835
to
d8b506f
Compare
* It is used in filtering. | ||
*/ | ||
public enum SyncStatus { | ||
SYNCED, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
SourceControlMetadataRecord record = lastRecord.get(); | ||
String nextPageToken = !limitReachedAndLastRefreshPair.getKey() || record == null ? null : | ||
record.getName(); | ||
Long lastRefreshTime = limitReachedAndLastRefreshPair.getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just expose a function in applicationLifecycleService to get the last refresh time
...p-fabric/src/main/java/io/cdap/cdap/gateway/handlers/SourceControlManagementHttpHandler.java
Show resolved
Hide resolved
...c/main/java/io/cdap/cdap/internal/app/sourcecontrol/SourceControlMetadataRefreshService.java
Show resolved
Hide resolved
|
||
public void removeRefreshService(NamespaceId namespaceId) { | ||
if(refreshSchedulers.containsKey(namespaceId)) { | ||
refreshSchedulers.get(namespaceId).stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check if there is some Optional
return
cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java
Show resolved
Hide resolved
AppMetadataStore appMetadataStore = getAppMetadataStore(context); | ||
for (Map.Entry<ApplicationId, SourceControlMeta> updateRequest : updateRequests.entrySet()) { | ||
ApplicationId appId = updateRequest.getKey(); | ||
if (appMetadataStore.getApplication(appId) != null) { | ||
sourceControlMetadataStore.write(appId.getAppReference(), | ||
SourceControlMeta.builder(updateRequest.getValue()).setSyncStatus(true).build()); | ||
if (repoStore.get(appId.getAppReference()) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should always update repostore
TransactionRunners.run(transactionRunner, context -> { | ||
getNamespaceSourceControlMetadataStore(context).write(appRef, | ||
SourceControlMeta.builder(sourceControlMeta).setSyncStatus(true).build()); | ||
RepositorySourceControlMetadataStore repoStore = getRepoSourceControlMetadataStore(context); | ||
if (repoStore.get(appRef) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
getNamespaceSourceControlMetadataStore(context).write(id.getAppReference(), | ||
meta.getSourceControlMeta() == null ? SourceControlMeta.createDefaultMeta() | ||
: SourceControlMeta.builder(meta.getSourceControlMeta()).setSyncStatus(true).build()); | ||
if (repoStore.get(appRef) != null) { | ||
repoStore.write(appRef, meta.getSourceControlMeta() != null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Please add temporary variables to hold isSync, LastModified value. This way it would be easy to understand what is being passed to write()
+ namespaceId.getNamespace()); | ||
refreshSchedulers.get(namespaceId).triggerManualRefresh(false); | ||
} else { | ||
addRefreshService(namespaceId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should always happen before the if
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But addRefreshService should be called only if it is not already present in refreshSchedulers, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addRefreshService should have putIfAbsent
that would take care of existing scheduler issue
...c/main/java/io/cdap/cdap/internal/app/sourcecontrol/SourceControlMetadataRefreshService.java
Show resolved
Hide resolved
* @param namespaceId The ID of the namespace for which to add the refresh service. | ||
*/ | ||
public void addRefreshService(NamespaceId namespaceId) { | ||
LOG.info("Adding refresh service for " + namespaceId.getNamespace()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
debug log. and add it after you added the scheduler
*/ | ||
public void addRefreshService(NamespaceId namespaceId) { | ||
LOG.info("Adding refresh service for " + namespaceId.getNamespace()); | ||
if (!refreshSchedulers.containsKey(namespaceId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use putIfNotPresent
. The current logic is not threadsafe
LOG.info("Shutting down SourceControlManagementService"); | ||
|
||
for (NamespaceSourceControlMetadataRefreshService service : refreshSchedulers.values()) { | ||
service.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stopandwait()
LOG.info("Starting SourceControlMetadataRefreshManager"); | ||
|
||
List<NamespaceId> namespaceIds = namespaceAdmin.list().stream() | ||
.map(meta -> meta.getNamespaceId()).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can do
if (getRepositoryMeta(namespaceId) != null) {
addRefreshService(namespaceId);
}
here only. no need for a list
|
||
// Triggering source control metadata refresh service | ||
try { | ||
checkSourceControlMetadataAutoRefreshFlag(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not do this. Otherwise in case if the flag is false the API would not work at all. Just skip refresh if the flag is not enabled
|
||
public SingleSourceControlMetadataResponse(SourceControlMetadataRecord record, | ||
Long lastRefreshTime) { | ||
this.record = record; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont name it record. rather name it app
. Similar to how list api is calling this apps
@Override | ||
public void runOneIteration() { | ||
|
||
running.set(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please call runrefreshservice
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!running.compareAndSet(false, true)) {
return;
}
runRefreshService(forced);
</property> | ||
<property> | ||
<name>source.control.metadata.refresh.buffer.seconds</name> | ||
<!-- <value>600</value>--> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix. Set it to 10 min by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix checkstyle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please fix checkstyle
meta.getSourceControlMeta() == null ? SourceControlMeta.createDefaultMeta() | ||
: SourceControlMeta.builder(meta.getSourceControlMeta()).setSyncStatus(true) | ||
.build()); | ||
if (repoStore.get(appRef) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we not also write when meta.getSourceControlMeta() != null
?
getNamespaceSourceControlMetadataStore(context).write(id.getAppReference(), | ||
meta.getSourceControlMeta() == null ? SourceControlMeta.createDefaultMeta() | ||
: SourceControlMeta.builder(meta.getSourceControlMeta()).setSyncStatus(true).build()); | ||
if (repoStore.get(appRef) != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we not also write when meta.getSourceControlMeta() != null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I have changed to -
if (meta.getSourceControlMeta() != null) {
Instant lastSyncedAt = meta.getSourceControlMeta().getLastSyncedAt();
repoStore.write(appRef, true, lastSyncedAt == null ? null :
lastSyncedAt.toEpochMilli());
}
else{
ImmutablePair<Long, Boolean> lastSyncedAndStatusPair = repoStore.get(appRef);
if(lastSyncedAndStatusPair != null) {
repoStore.write(appRef, false, lastSyncedAndStatusPair.getFirst());
}
}
Three cases should be considered:
- If we are deploying an app in namespace and hasn't been pulled or pushed before - In this case we check, scm meta is null then we dont write anything in repo table since it is not present in the repo table.
- If we are deploying an app already been pulled/pushed before - In this case the record will be present in repo table hence we will change the sync status but retain the last modified.
- If we are pulling and deploying - In this case since scm meta will not be null we will be setting the values in repo table.
I feel the previous implementation might have also worked based on the concept that if we are pulling and deploying or are deploying an app which has been pulled/pushed before, the record would be there in the repo table.
...c/main/java/io/cdap/cdap/internal/app/sourcecontrol/SourceControlMetadataRefreshService.java
Show resolved
Hide resolved
public void addRefreshService(NamespaceId namespaceId) { | ||
NamespaceSourceControlMetadataRefreshService service = | ||
refreshSchedulers.putIfAbsent(namespaceId, createRefreshService(namespaceId)); | ||
if (service == null || isSourceControlMetadataPeriodicRefreshFlagEnabled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add the service only if flag is enabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it is possible that the user disables the periodic refresh flag but keeps manual refresh flag enabled
refreshSchedulers.get(namespaceId).start(); | ||
LOG.debug("Added Scheduled NamespaceSourceControlMetadataRefreshService for " | ||
+ namespaceId.getNamespace()); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we triggering here again ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void addRefreshService(NamespaceId namespaceId) {
NamespaceSourceControlMetadataRefreshService service =
refreshSchedulers.putIfAbsent(namespaceId, createRefreshService(namespaceId));
if (service == null || isSourceControlMetadataPeriodicRefreshFlagEnabled()) {
refreshSchedulers.get(namespaceId).start();
} else {
refreshSchedulers.get(namespaceId).triggerManualRefresh(true);
}
}
If we are setting the repo config for the first time, it creates refresh service, calls start function, and performs runOneIteration.
When we are updating the repo config, it is already present in refreshSchedulers
. If start() function is called, it will see that it has already started and will not call runOneIteration instantly. So we have to trigger it manually so that the changes are reflected in the git pipelines page.
...c/main/java/io/cdap/cdap/internal/app/sourcecontrol/SourceControlMetadataRefreshService.java
Show resolved
Hide resolved
protected void startUp() throws Exception { | ||
// This gives users an option to disable the source control metadata refresh service | ||
// for all namespaces | ||
if (this.runInterval <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this as we already have a feature flag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merge only after requested changes are done
* or null if metadata is not found. | ||
* @throws IOException If an I/O error occurs. | ||
*/ | ||
public ImmutablePair get(ApplicationReference appRef) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this changed needed ? why we cant use SourceControlMetadataRecord
lastSyncedAt.toEpochMilli()); | ||
} | ||
else{ | ||
ImmutablePair<Long, Boolean> lastSyncedAndStatusPair = repoStore.get(appRef); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why cant we just use SourceControlMetadataRecord
?
else{ | ||
ImmutablePair<Long, Boolean> lastSyncedAndStatusPair = repoStore.get(appRef); | ||
if(lastSyncedAndStatusPair != null) { | ||
repoStore.write(appRef, false, lastSyncedAndStatusPair.getFirst()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned before please add a separate function to toggle sync status only
*/ | ||
public Long getLastRefreshTime(NamespaceId namespaceId) { | ||
if (refreshSchedulers.containsKey(namespaceId)) { | ||
return refreshSchedulers.get(namespaceId).getLastRefreshTime(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not correct as refreshSchedulers.get(namespaceId)
can be null. Rather get refreshSchedulers.get(namespaceId)
in a temp variable and check if it is null
protected void shutDown() throws Exception { | ||
LOG.info("Shutting down SourceControlMetadataRefreshService"); | ||
|
||
for (NamespaceSourceControlMetadataRefreshService service : refreshSchedulers.values()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Use forEach
lastRecord.set(scmMetaRecord); | ||
}); | ||
} catch (IOException e) { | ||
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please throw exception and let the http exception handler to handle the response. If you send here, you need to return from the catch block, otherwise the flow will continue.
} catch (IOException e) { | ||
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage()); | ||
} | ||
SourceControlMetadataRecord record = lastRecord.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lastRecord
variable seems unnecessary. The same can be achieved by apps.isEmpty()
and apps.get(apps.size() - 1)
.
SourceControlMetadataRecord record = lastRecord.get(); | ||
String nextPageToken = !pageLimitReached || record == null ? null : | ||
record.getName(); | ||
Long lastRefreshTime = applicationLifecycleService.getLastRefreshTime(namespaceId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would execute another SQL in another transaction, which may be inconsistent with what has been returned by the scanSourceControlMetadata
call. I think it's better for the scan method to return a response that contains apps, page limit, and last scan time. This makes sure all the data are coming from the same transaction.
lastRecord.set(record); | ||
}); | ||
} catch (IOException e) { | ||
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above. Please throw. The old code was wrong too, please don't follow with that pattern.
SourceControlMetadataRecord record = lastRecord.get(); | ||
String nextPageToken = !pageLimitReached || record == null ? null : | ||
record.getName(); | ||
Long lastRefreshTime = sourceControlService.getLastRefreshTime(namespaceId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be using primitive long
instead.
|
||
// Triggering source control metadata refresh service | ||
if (isSourceControlMetadataManualRefreshFlagEnabled()) { | ||
sourceControlMetadataRefreshService.runRefreshService(namespaceId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the refreshment is enabled only when there is a scan?
|
||
@Override | ||
protected final ScheduledExecutorService executor() { | ||
executor = Executors.newSingleThreadScheduledExecutor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we have one thread per namespace for the refresh? Seems not an efficient way of using thread. Ideally we only need a small scheduled thread pool for all namespaces refresh?
// TODO(CDAP-21017): Optimize periodic refresh of source control metadata | ||
@Override | ||
public void runOneIteration() { | ||
if (!running.compareAndSet(false, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't need an atomic boolean for this. The AbstractScheduledService
should have the correct guarantee of how the scheduled run is.
|
||
private void runRefreshService(boolean forced) { | ||
running.set(true); | ||
Long refreshStartTime = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use primitive long
.
|
||
try { | ||
|
||
if (!namespaceAdmin.exists(namespaceId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything that happens in one run should happens in the same transaction. Using admin will create a different transaction boundary.
new NamespaceRepository(namespaceId, repoConfig)); | ||
|
||
// Cleaning up the repo source control metadata table | ||
HashSet<String> repoFileNames = repositoryAppsResponse.getApps().stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use interface Set
as the variable type.
lastRefreshTime.set(refreshStartTime); | ||
|
||
} catch (Exception e) { | ||
LOG.error("Failed to refresh source control metadata for namespace " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use placeholder {}
for logging instead of string concat. Also, seems like a warning than error.
@Override | ||
protected void startUp() throws Exception { | ||
LOG.info("Starting SourceControlMetadataRefreshService for namespace " | ||
+ namespaceId.getNamespace() + " with interval " + runInterval + " seconds"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use placeholder {}
instead of string concat for loggin.
} | ||
|
||
private void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash, | ||
Long refreshStartTime) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use primitive long
.
.setNamespace(namespace) | ||
.setLimit(Integer.MAX_VALUE) | ||
.build(); | ||
ArrayList<SourceControlMetadataRecord> records = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use List
as the type.
throws IOException { | ||
for (SourceControlMetadataRecord record : records) { | ||
if (!repoFiles.contains(record.getName())) { | ||
store.delete(new ApplicationReference(record.getNamespace(), record.getName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does the store.delete
actually do? Asking it because this method is nested inside the transactional scan method.
return true; | ||
} | ||
|
||
public Long getLastRefreshTime(String namespace) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use primitive long
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please address the top level comments first. I will take another pass after those are fixed.