12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697469846994700470147024703470447054706470747084709471047114712471347144715471647174718471947204721472247234724472547264727472847294730473147324733473447354736473747384739474047414742474347444745474647474748474947504751475247534754475547564757475847594760476147624763476447654766476747684769477047714772477347744775477647774778477947804781478247834784478547864787478847894790479147924793479447954796479747984799480048014802480348044805480648074808480948104811481248134814481548164817481848194820482148224823482448254826482748284829483048314832483348344835483648374838483948404841484248434844484548464847484848494850485148524853485448554856485748584859486048614862486348644865486648674868486948704871487248734874487548764877487848794880488148824883488448854886488748884889489048914892489348944895489648974898489949004901490249034904490549064907490849094910491149124913491449154916491749184919492049214922492349244925492649274928492949304931493249334934493549364937493849394940494149424943494449454946494749484949495049514952495349544955495649574958495949604961496249634964496549664967496849694970497149724973497449754976497749784979498049814982498349844985498649874988498949904991499249934994499549964997499849995000500150025003500450055006500750085009501050115012501350145015501650175018501950205021502250235024502550265027502850295030503150325033503450355036503750385039504050415042504350445045504650475048504950505051505250535054505550565057505850595060506150625063506450655066506750685069507050715072507350745075507650775078507950805081508250835084508550865087508850895090509150925093509450955096509750985099510051015102 |
- # -*- coding: utf-8 -*-
- """
- Tests for the file state
- """
- from __future__ import absolute_import, print_function, unicode_literals
- import errno
- import filecmp
- import logging
- import os
- import re
- import shutil
- import stat
- import sys
- import tempfile
- import textwrap
- import pytest
- import salt.serializers.configparser
- import salt.utils.data
- import salt.utils.files
- import salt.utils.json
- import salt.utils.path
- import salt.utils.platform
- import salt.utils.stringutils
- from salt.ext import six
- from salt.ext.six.moves import range
- from salt.utils.versions import LooseVersion as _LooseVersion
- from tests.support.case import ModuleCase
- from tests.support.helpers import (
- Webserver,
- dedent,
- destructiveTest,
- skip_if_not_root,
- with_system_user_and_group,
- with_tempdir,
- with_tempfile,
- )
- from tests.support.mixins import SaltReturnAssertsMixin
- from tests.support.runtests import RUNTIME_VARS
- from tests.support.unit import skipIf
- log = logging.getLogger(__name__)
- HAS_PWD = True
- try:
- import pwd
- except ImportError:
- HAS_PWD = False
- HAS_GRP = True
- try:
- import grp
- except ImportError:
- HAS_GRP = False
- IS_WINDOWS = salt.utils.platform.is_windows()
- BINARY_FILE = b"GIF89a\x01\x00\x01\x00\x80\x00\x00\x05\x04\x04\x00\x00\x00,\x00\x00\x00\x00\x01\x00\x01\x00\x00\x02\x02D\x01\x00;"
- TEST_SYSTEM_USER = "test_system_user"
- TEST_SYSTEM_GROUP = "test_system_group"
- def _test_managed_file_mode_keep_helper(testcase, local=False):
- """
- DRY helper function to run the same test with a local or remote path
- """
- name = os.path.join(RUNTIME_VARS.TMP, "scene33")
- grail_fs_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene33")
- grail = "salt://grail/scene33" if not local else grail_fs_path
- # Get the current mode so that we can put the file back the way we
- # found it when we're done.
- grail_fs_mode = int(testcase.run_function("file.get_mode", [grail_fs_path]), 8)
- initial_mode = 0o770
- new_mode_1 = 0o600
- new_mode_2 = 0o644
- # Set the initial mode, so we can be assured that when we set the mode
- # to "keep", we're actually changing the permissions of the file to the
- # new mode.
- ret = testcase.run_state(
- "file.managed", name=name, mode=oct(initial_mode), source=grail,
- )
- if IS_WINDOWS:
- testcase.assertSaltFalseReturn(ret)
- return
- testcase.assertSaltTrueReturn(ret)
- try:
- # Update the mode on the fileserver (pass 1)
- os.chmod(grail_fs_path, new_mode_1)
- ret = testcase.run_state("file.managed", name=name, mode="keep", source=grail,)
- testcase.assertSaltTrueReturn(ret)
- managed_mode = stat.S_IMODE(os.stat(name).st_mode)
- testcase.assertEqual(oct(managed_mode), oct(new_mode_1))
- # Update the mode on the fileserver (pass 2)
- # This assures us that if the file in file_roots was originally set
- # to the same mode as new_mode_1, we definitely get an updated mode
- # this time.
- os.chmod(grail_fs_path, new_mode_2)
- ret = testcase.run_state("file.managed", name=name, mode="keep", source=grail,)
- testcase.assertSaltTrueReturn(ret)
- managed_mode = stat.S_IMODE(os.stat(name).st_mode)
- testcase.assertEqual(oct(managed_mode), oct(new_mode_2))
- finally:
- # Set the mode of the file in the file_roots back to what it
- # originally was.
- os.chmod(grail_fs_path, grail_fs_mode)
- @pytest.mark.windows_whitelisted
- class FileTest(ModuleCase, SaltReturnAssertsMixin):
- """
- Validate the file state
- """
- def _delete_file(self, path):
- try:
- os.remove(path)
- except OSError as exc:
- if exc.errno != errno.ENOENT:
- log.error("Failed to remove %s: %s", path, exc)
- def tearDown(self):
- """
- remove files created in previous tests
- """
- user = "salt"
- if user in str(self.run_function("user.list_users")):
- self.run_function("user.delete", [user])
- def test_symlink(self):
- """
- file.symlink
- """
- name = os.path.join(RUNTIME_VARS.TMP, "symlink")
- tgt = os.path.join(RUNTIME_VARS.TMP, "target")
- # Windows must have a source directory to link to
- if IS_WINDOWS and not os.path.isdir(tgt):
- os.mkdir(tgt)
- # Windows cannot create a symlink if it already exists
- if IS_WINDOWS and self.run_function("file.is_link", [name]):
- self.run_function("file.remove", [name])
- ret = self.run_state("file.symlink", name=name, target=tgt)
- self.assertSaltTrueReturn(ret)
- def test_test_symlink(self):
- """
- file.symlink test interface
- """
- name = os.path.join(RUNTIME_VARS.TMP, "symlink2")
- tgt = os.path.join(RUNTIME_VARS.TMP, "target")
- ret = self.run_state("file.symlink", test=True, name=name, target=tgt)
- self.assertSaltNoneReturn(ret)
- def test_absent_file(self):
- """
- file.absent
- """
- name = os.path.join(RUNTIME_VARS.TMP, "file_to_kill")
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("killme")
- ret = self.run_state("file.absent", name=name)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.isfile(name))
- def test_absent_dir(self):
- """
- file.absent
- """
- name = os.path.join(RUNTIME_VARS.TMP, "dir_to_kill")
- if not os.path.isdir(name):
- # left behind... Don't fail because of this!
- os.makedirs(name)
- ret = self.run_state("file.absent", name=name)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.isdir(name))
- def test_absent_link(self):
- """
- file.absent
- """
- name = os.path.join(RUNTIME_VARS.TMP, "link_to_kill")
- tgt = "{0}.tgt".format(name)
- # Windows must have a source directory to link to
- if IS_WINDOWS and not os.path.isdir(tgt):
- os.mkdir(tgt)
- if not self.run_function("file.is_link", [name]):
- self.run_function("file.symlink", [tgt, name])
- ret = self.run_state("file.absent", name=name)
- try:
- self.assertSaltTrueReturn(ret)
- self.assertFalse(self.run_function("file.is_link", [name]))
- finally:
- if self.run_function("file.is_link", [name]):
- self.run_function("file.remove", [name])
- @with_tempfile()
- def test_test_absent(self, name):
- """
- file.absent test interface
- """
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("killme")
- ret = self.run_state("file.absent", test=True, name=name)
- self.assertSaltNoneReturn(ret)
- self.assertTrue(os.path.isfile(name))
- def test_managed(self):
- """
- file.managed
- """
- name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
- ret = self.run_state("file.managed", name=name, source="salt://grail/scene33")
- src = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene33")
- with salt.utils.files.fopen(src, "r") as fp_:
- master_data = fp_.read()
- with salt.utils.files.fopen(name, "r") as fp_:
- minion_data = fp_.read()
- self.assertEqual(master_data, minion_data)
- self.assertSaltTrueReturn(ret)
- def test_managed_file_mode(self):
- """
- file.managed, correct file permissions
- """
- desired_mode = 504 # 0770 octal
- name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
- ret = self.run_state(
- "file.managed", name=name, mode="0770", source="salt://grail/scene33"
- )
- if IS_WINDOWS:
- expected = "The 'mode' option is not supported on Windows"
- self.assertEqual(ret[list(ret)[0]]["comment"], expected)
- self.assertSaltFalseReturn(ret)
- return
- resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
- def test_managed_file_mode_keep(self):
- """
- Test using "mode: keep" in a file.managed state
- """
- _test_managed_file_mode_keep_helper(self, local=False)
- @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
- def test_managed_file_mode_keep_local_source(self):
- """
- Test using "mode: keep" in a file.managed state, with a local file path
- as the source.
- """
- _test_managed_file_mode_keep_helper(self, local=True)
- def test_managed_file_mode_file_exists_replace(self):
- """
- file.managed, existing file with replace=True, change permissions
- """
- initial_mode = 504 # 0770 octal
- desired_mode = 384 # 0600 octal
- name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
- ret = self.run_state(
- "file.managed",
- name=name,
- mode=oct(initial_mode),
- source="salt://grail/scene33",
- )
- if IS_WINDOWS:
- expected = "The 'mode' option is not supported on Windows"
- self.assertEqual(ret[list(ret)[0]]["comment"], expected)
- self.assertSaltFalseReturn(ret)
- return
- resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
- self.assertEqual(oct(initial_mode), oct(resulting_mode))
- name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
- ret = self.run_state(
- "file.managed",
- name=name,
- replace=True,
- mode=oct(desired_mode),
- source="salt://grail/scene33",
- )
- resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- def test_managed_file_mode_file_exists_noreplace(self):
- """
- file.managed, existing file with replace=False, change permissions
- """
- initial_mode = 504 # 0770 octal
- desired_mode = 384 # 0600 octal
- name = os.path.join(RUNTIME_VARS.TMP, "grail_scene33")
- ret = self.run_state(
- "file.managed",
- name=name,
- replace=True,
- mode=oct(initial_mode),
- source="salt://grail/scene33",
- )
- if IS_WINDOWS:
- expected = "The 'mode' option is not supported on Windows"
- self.assertEqual(ret[list(ret)[0]]["comment"], expected)
- self.assertSaltFalseReturn(ret)
- return
- ret = self.run_state(
- "file.managed",
- name=name,
- replace=False,
- mode=oct(desired_mode),
- source="salt://grail/scene33",
- )
- resulting_mode = stat.S_IMODE(os.stat(name).st_mode)
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- def test_managed_file_with_grains_data(self):
- """
- Test to ensure we can render grains data into a managed
- file.
- """
- grain_path = os.path.join(RUNTIME_VARS.TMP, "file-grain-test")
- state_file = "file-grainget"
- self.run_function("state.sls", [state_file], pillar={"grain_path": grain_path})
- self.assertTrue(os.path.exists(grain_path))
- with salt.utils.files.fopen(grain_path, "r") as fp_:
- file_contents = fp_.readlines()
- if IS_WINDOWS:
- match = "^minion\r\n"
- else:
- match = "^minion\n"
- self.assertTrue(re.match(match, file_contents[0]))
- def test_managed_file_with_pillar_sls(self):
- """
- Test to ensure pillar data in sls file
- is rendered properly and file is created.
- """
- file_pillar = os.path.join(RUNTIME_VARS.TMP, "filepillar-python")
- self.addCleanup(self._delete_file, file_pillar)
- state_name = "file-pillarget"
- log.warning("File Path: %s", file_pillar)
- ret = self.run_function("state.sls", [state_name])
- self.assertSaltTrueReturn(ret)
- # Check to make sure the file was created
- check_file = self.run_function("file.file_exists", [file_pillar])
- self.assertTrue(check_file)
- def test_managed_file_with_pillardefault_sls(self):
- """
- Test to ensure when pillar data is not available
- in sls file with pillar.get it uses the default
- value.
- """
- file_pillar_def = os.path.join(RUNTIME_VARS.TMP, "filepillar-defaultvalue")
- self.addCleanup(self._delete_file, file_pillar_def)
- state_name = "file-pillardefaultget"
- log.warning("File Path: %s", file_pillar_def)
- ret = self.run_function("state.sls", [state_name])
- self.assertSaltTrueReturn(ret)
- # Check to make sure the file was created
- check_file = self.run_function("file.file_exists", [file_pillar_def])
- self.assertTrue(check_file)
- @skip_if_not_root
- def test_managed_dir_mode(self):
- """
- Tests to ensure that file.managed creates directories with the
- permissions requested with the dir_mode argument
- """
- desired_mode = 511 # 0777 in octal
- name = os.path.join(RUNTIME_VARS.TMP, "a", "managed_dir_mode_test_file")
- desired_owner = "nobody"
- ret = self.run_state(
- "file.managed",
- name=name,
- source="salt://grail/scene33",
- mode=600,
- makedirs=True,
- user=desired_owner,
- dir_mode=oct(desired_mode), # 0777
- )
- if IS_WINDOWS:
- expected = "The 'mode' option is not supported on Windows"
- self.assertEqual(ret[list(ret)[0]]["comment"], expected)
- self.assertSaltFalseReturn(ret)
- return
- resulting_mode = stat.S_IMODE(
- os.stat(os.path.join(RUNTIME_VARS.TMP, "a")).st_mode
- )
- resulting_owner = pwd.getpwuid(
- os.stat(os.path.join(RUNTIME_VARS.TMP, "a")).st_uid
- ).pw_name
- self.assertEqual(oct(desired_mode), oct(resulting_mode))
- self.assertSaltTrueReturn(ret)
- self.assertEqual(desired_owner, resulting_owner)
- def test_test_managed(self):
- """
- file.managed test interface
- """
- name = os.path.join(RUNTIME_VARS.TMP, "grail_not_not_scene33")
- ret = self.run_state(
- "file.managed", test=True, name=name, source="salt://grail/scene33"
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(name))
- def test_managed_show_changes_false(self):
- """
- file.managed test interface
- """
- name = os.path.join(RUNTIME_VARS.TMP, "grail_not_scene33")
- with salt.utils.files.fopen(name, "wb") as fp_:
- fp_.write(b"test_managed_show_changes_false\n")
- ret = self.run_state(
- "file.managed", name=name, source="salt://grail/scene33", show_changes=False
- )
- changes = next(six.itervalues(ret))["changes"]
- self.assertEqual("<show_changes=False>", changes["diff"])
- def test_managed_show_changes_true(self):
- """
- file.managed test interface
- """
- name = os.path.join(RUNTIME_VARS.TMP, "grail_not_scene33")
- with salt.utils.files.fopen(name, "wb") as fp_:
- fp_.write(b"test_managed_show_changes_false\n")
- ret = self.run_state("file.managed", name=name, source="salt://grail/scene33",)
- changes = next(six.itervalues(ret))["changes"]
- self.assertIn("diff", changes)
- @skipIf(IS_WINDOWS, "Don't know how to fix for Windows")
- def test_managed_escaped_file_path(self):
- """
- file.managed test that 'salt://|' protects unusual characters in file path
- """
- funny_file = salt.utils.files.mkstemp(
- prefix="?f!le? n@=3&", suffix=".file type"
- )
- funny_file_name = os.path.split(funny_file)[1]
- funny_url = "salt://|" + funny_file_name
- funny_url_path = os.path.join(RUNTIME_VARS.BASE_FILES, funny_file_name)
- state_name = "funny_file"
- state_file_name = state_name + ".sls"
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_file_name)
- state_key = "file_|-{0}_|-{0}_|-managed".format(funny_file)
- self.addCleanup(os.remove, state_file)
- self.addCleanup(os.remove, funny_file)
- self.addCleanup(os.remove, funny_url_path)
- with salt.utils.files.fopen(funny_url_path, "w"):
- pass
- with salt.utils.files.fopen(state_file, "w") as fp_:
- fp_.write(
- textwrap.dedent(
- """\
- {0}:
- file.managed:
- - source: {1}
- - makedirs: True
- """.format(
- funny_file, funny_url
- )
- )
- )
- ret = self.run_function("state.sls", [state_name])
- self.assertTrue(ret[state_key]["result"])
- def test_managed_contents(self):
- """
- test file.managed with contents that is a boolean, string, integer,
- float, list, and dictionary
- """
- state_name = "file-FileTest-test_managed_contents"
- state_filename = state_name + ".sls"
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
- managed_files = {}
- state_keys = {}
- for typ in ("bool", "str", "int", "float", "list", "dict"):
- managed_files[typ] = salt.utils.files.mkstemp()
- state_keys[typ] = "file_|-{0} file_|-{1}_|-managed".format(
- typ, managed_files[typ]
- )
- try:
- with salt.utils.files.fopen(state_file, "w") as fd_:
- fd_.write(
- textwrap.dedent(
- """\
- bool file:
- file.managed:
- - name: {bool}
- - contents: True
- str file:
- file.managed:
- - name: {str}
- - contents: Salt was here.
- int file:
- file.managed:
- - name: {int}
- - contents: 340282366920938463463374607431768211456
- float file:
- file.managed:
- - name: {float}
- - contents: 1.7518e-45 # gravitational coupling constant
- list file:
- file.managed:
- - name: {list}
- - contents: [1, 1, 2, 3, 5, 8, 13]
- dict file:
- file.managed:
- - name: {dict}
- - contents:
- C: charge
- P: parity
- T: time
- """.format(
- **managed_files
- )
- )
- )
- ret = self.run_function("state.sls", [state_name])
- self.assertSaltTrueReturn(ret)
- for typ in state_keys:
- self.assertTrue(ret[state_keys[typ]]["result"])
- self.assertIn("diff", ret[state_keys[typ]]["changes"])
- finally:
- if os.path.exists(state_file):
- os.remove(state_file)
- for typ in managed_files:
- if os.path.exists(managed_files[typ]):
- os.remove(managed_files[typ])
- def test_managed_contents_with_contents_newline(self):
- """
- test file.managed with contents by using the default content_newline
- flag.
- """
- contents = "test_managed_contents_with_newline_one"
- name = os.path.join(RUNTIME_VARS.TMP, "foo")
- # Create a file named foo with contents as above but with a \n at EOF
- self.run_state(
- "file.managed", name=name, contents=contents, contents_newline=True
- )
- with salt.utils.files.fopen(name, "r") as fp_:
- last_line = fp_.read()
- self.assertEqual((contents + os.linesep), last_line)
- def test_managed_contents_with_contents_newline_false(self):
- """
- test file.managed with contents by using the non default content_newline
- flag.
- """
- contents = "test_managed_contents_with_newline_one"
- name = os.path.join(RUNTIME_VARS.TMP, "bar")
- # Create a file named foo with contents as above but with a \n at EOF
- self.run_state(
- "file.managed", name=name, contents=contents, contents_newline=False
- )
- with salt.utils.files.fopen(name, "r") as fp_:
- last_line = fp_.read()
- self.assertEqual(contents, last_line)
- def test_managed_multiline_contents_with_contents_newline(self):
- """
- test file.managed with contents by using the non default content_newline
- flag.
- """
- contents = "this is a cookie{}this is another cookie".format(os.linesep)
- name = os.path.join(RUNTIME_VARS.TMP, "bar")
- # Create a file named foo with contents as above but with a \n at EOF
- self.run_state(
- "file.managed", name=name, contents=contents, contents_newline=True
- )
- with salt.utils.files.fopen(name, "r") as fp_:
- last_line = fp_.read()
- self.assertEqual((contents + os.linesep), last_line)
- def test_managed_multiline_contents_with_contents_newline_false(self):
- """
- test file.managed with contents by using the non default content_newline
- flag.
- """
- contents = "this is a cookie{}this is another cookie".format(os.linesep)
- name = os.path.join(RUNTIME_VARS.TMP, "bar")
- # Create a file named foo with contents as above but with a \n at EOF
- self.run_state(
- "file.managed", name=name, contents=contents, contents_newline=False
- )
- with salt.utils.files.fopen(name, "r") as fp_:
- last_line = fp_.read()
- self.assertEqual(contents, last_line)
- @skip_if_not_root
- @skipIf(IS_WINDOWS, 'Windows does not support "mode" kwarg. Skipping.')
- @skipIf(not salt.utils.path.which("visudo"), "sudo is missing")
- def test_managed_check_cmd(self):
- """
- Test file.managed passing a basic check_cmd kwarg. See Issue #38111.
- """
- r_group = "root"
- if salt.utils.platform.is_darwin():
- r_group = "wheel"
- try:
- ret = self.run_state(
- "file.managed",
- name="/tmp/sudoers",
- user="root",
- group=r_group,
- mode=440,
- check_cmd="visudo -c -s -f",
- )
- self.assertSaltTrueReturn(ret)
- self.assertInSaltComment("Empty file", ret)
- self.assertEqual(
- ret["file_|-/tmp/sudoers_|-/tmp/sudoers_|-managed"]["changes"],
- {"new": "file /tmp/sudoers created", "mode": "0440"},
- )
- finally:
- # Clean Up File
- if os.path.exists("/tmp/sudoers"):
- os.remove("/tmp/sudoers")
- def test_managed_local_source_with_source_hash(self):
- """
- Make sure that we enforce the source_hash even with local files
- """
- name = os.path.join(RUNTIME_VARS.TMP, "local_source_with_source_hash")
- local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene33")
- actual_hash = "567fd840bf1548edc35c48eb66cdd78bfdfcccff"
- if IS_WINDOWS:
- # CRLF vs LF causes a different hash on windows
- actual_hash = "f658a0ec121d9c17088795afcc6ff3c43cb9842a"
- # Reverse the actual hash
- bad_hash = actual_hash[::-1]
- def remove_file():
- try:
- os.remove(name)
- except OSError as exc:
- if exc.errno != errno.ENOENT:
- raise
- def do_test(clean=False):
- for proto in ("file://", ""):
- source = proto + local_path
- log.debug("Trying source %s", source)
- try:
- ret = self.run_state(
- "file.managed",
- name=name,
- source=source,
- source_hash="sha1={0}".format(bad_hash),
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- # Shouldn't be any changes
- self.assertFalse(ret["changes"])
- # Check that we identified a hash mismatch
- self.assertIn("does not match actual checksum", ret["comment"])
- ret = self.run_state(
- "file.managed",
- name=name,
- source=source,
- source_hash="sha1={0}".format(actual_hash),
- )
- self.assertSaltTrueReturn(ret)
- finally:
- if clean:
- remove_file()
- remove_file()
- log.debug("Trying with nonexistant destination file")
- do_test()
- log.debug("Trying with destination file already present")
- with salt.utils.files.fopen(name, "w"):
- pass
- try:
- do_test(clean=False)
- finally:
- remove_file()
- def test_managed_local_source_does_not_exist(self):
- """
- Make sure that we exit gracefully when a local source doesn't exist
- """
- name = os.path.join(RUNTIME_VARS.TMP, "local_source_does_not_exist")
- local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "grail", "scene99")
- for proto in ("file://", ""):
- source = proto + local_path
- log.debug("Trying source %s", source)
- ret = self.run_state("file.managed", name=name, source=source)
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- # Shouldn't be any changes
- self.assertFalse(ret["changes"])
- # Check that we identified a hash mismatch
- self.assertIn("does not exist", ret["comment"])
- def test_managed_unicode_jinja_with_tojson_filter(self):
- """
- Using {{ varname }} with a list or dictionary which contains unicode
- types on Python 2 will result in Jinja rendering the "u" prefix on each
- string. This tests that using the "tojson" jinja filter will dump them
- to a format which can be successfully loaded by our YAML loader.
- The two lines that should end up being rendered are meant to test two
- issues that would trip up PyYAML if the "tojson" filter were not used:
- 1. A unicode string type would be loaded as a unicode literal with the
- leading "u" as well as the quotes, rather than simply being loaded
- as the proper unicode type which matches the content of the string
- literal. In other words, u'foo' would be loaded literally as
- u"u'foo'". This test includes actual non-ascii unicode in one of the
- strings to confirm that this also handles these international
- characters properly.
- 2. Any unicode string type (such as a URL) which contains a colon would
- cause a ScannerError in PyYAML, as it would be assumed to delimit a
- mapping node.
- Dumping the data structure to JSON using the "tojson" jinja filter
- should produce an inline data structure which is valid YAML and will be
- loaded properly by our YAML loader.
- """
- test_file = os.path.join(RUNTIME_VARS.TMP, "test-tojson.txt")
- ret = self.run_function(
- "state.apply", mods="tojson", pillar={"tojson-file": test_file}
- )
- ret = ret[next(iter(ret))]
- assert ret["result"], ret
- with salt.utils.files.fopen(test_file, mode="rb") as fp_:
- managed = salt.utils.stringutils.to_unicode(fp_.read())
- expected = dedent(
- """\
- Die Webseite ist https://saltstack.com.
- Der Zucker ist süß.
- """
- )
- assert managed == expected, "{0!r} != {1!r}".format(managed, expected)
- def test_managed_source_hash_indifferent_case(self):
- """
- Test passing a source_hash as an uppercase hash.
- This is a regression test for Issue #38914 and Issue #48230 (test=true use).
- """
- name = os.path.join(RUNTIME_VARS.TMP, "source_hash_indifferent_case")
- state_name = "file_|-{0}_|" "-{0}_|-managed".format(name)
- local_path = os.path.join(RUNTIME_VARS.BASE_FILES, "hello_world.txt")
- actual_hash = "c98c24b677eff44860afea6f493bbaec5bb1c4cbb209c6fc2bbb47f66ff2ad31"
- if IS_WINDOWS:
- # CRLF vs LF causes a differnt hash on windows
- actual_hash = (
- "92b772380a3f8e27a93e57e6deeca6c01da07f5aadce78bb2fbb20de10a66925"
- )
- uppercase_hash = actual_hash.upper()
- try:
- # Lay down tmp file to test against
- self.run_state(
- "file.managed", name=name, source=local_path, source_hash=actual_hash
- )
- # Test uppercase source_hash: should return True with no changes
- ret = self.run_state(
- "file.managed", name=name, source=local_path, source_hash=uppercase_hash
- )
- assert ret[state_name]["result"] is True
- assert ret[state_name]["changes"] == {}
- # Test uppercase source_hash using test=true
- # Should return True with no changes
- ret = self.run_state(
- "file.managed",
- name=name,
- source=local_path,
- source_hash=uppercase_hash,
- test=True,
- )
- assert ret[state_name]["result"] is True
- assert ret[state_name]["changes"] == {}
- finally:
- # Clean Up File
- if os.path.exists(name):
- os.remove(name)
- @with_tempfile(create=False)
- def test_managed_latin1_diff(self, name):
- """
- Tests that latin-1 file contents are represented properly in the diff
- """
- # Lay down the initial file
- ret = self.run_state(
- "file.managed", name=name, source="salt://issue-48777/old.html"
- )
- ret = ret[next(iter(ret))]
- assert ret["result"] is True, ret
- # Replace it with the new file and check the diff
- ret = self.run_state(
- "file.managed", name=name, source="salt://issue-48777/new.html"
- )
- ret = ret[next(iter(ret))]
- assert ret["result"] is True, ret
- diff_lines = ret["changes"]["diff"].split(os.linesep)
- assert "+räksmörgås" in diff_lines, diff_lines
- @with_tempfile()
- def test_managed_keep_source_false_salt(self, name):
- """
- This test ensures that we properly clean the cached file if keep_source
- is set to False, for source files using a salt:// URL
- """
- source = "salt://grail/scene33"
- saltenv = "base"
- # Run the state
- ret = self.run_state(
- "file.managed", name=name, source=source, saltenv=saltenv, keep_source=False
- )
- ret = ret[next(iter(ret))]
- assert ret["result"] is True
- # Now make sure that the file is not cached
- result = self.run_function("cp.is_cached", [source, saltenv])
- assert result == "", "File is still cached at {0}".format(result)
- @with_tempfile(create=False)
- @with_tempfile(create=False)
- def test_file_managed_onchanges(self, file1, file2):
- """
- Test file.managed state with onchanges
- """
- pillar = {
- "file1": file1,
- "file2": file2,
- "source": "salt://testfile",
- "req": "onchanges",
- }
- # Lay down the file used in the below SLS to ensure that when it is
- # run, there are no changes.
- self.run_state("file.managed", name=pillar["file2"], source=pillar["source"])
- ret = self.repack_state_returns(
- self.run_function(
- "state.apply", mods="onchanges_prereq", pillar=pillar, test=True,
- )
- )
- # The file states should both exit with None
- assert ret["one"]["result"] is None, ret["one"]["result"]
- assert ret["three"]["result"] is True, ret["three"]["result"]
- # The first file state should have changes, since a new file was
- # created. The other one should not, since we already created that file
- # before applying the SLS file.
- assert ret["one"]["changes"]
- assert not ret["three"]["changes"], ret["three"]["changes"]
- # The state watching 'one' should have been run due to changes
- assert ret["two"]["comment"] == "Success!", ret["two"]["comment"]
- # The state watching 'three' should not have been run
- assert (
- ret["four"]["comment"]
- == "State was not run because none of the onchanges reqs changed"
- ), ret["four"]["comment"]
- @with_tempfile(create=False)
- @with_tempfile(create=False)
- def test_file_managed_prereq(self, file1, file2):
- """
- Test file.managed state with prereq
- """
- pillar = {
- "file1": file1,
- "file2": file2,
- "source": "salt://testfile",
- "req": "prereq",
- }
- # Lay down the file used in the below SLS to ensure that when it is
- # run, there are no changes.
- self.run_state("file.managed", name=pillar["file2"], source=pillar["source"])
- ret = self.repack_state_returns(
- self.run_function(
- "state.apply", mods="onchanges_prereq", pillar=pillar, test=True,
- )
- )
- # The file states should both exit with None
- assert ret["one"]["result"] is None, ret["one"]["result"]
- assert ret["three"]["result"] is True, ret["three"]["result"]
- # The first file state should have changes, since a new file was
- # created. The other one should not, since we already created that file
- # before applying the SLS file.
- assert ret["one"]["changes"]
- assert not ret["three"]["changes"], ret["three"]["changes"]
- # The state watching 'one' should have been run due to changes
- assert ret["two"]["comment"] == "Success!", ret["two"]["comment"]
- # The state watching 'three' should not have been run
- assert ret["four"]["comment"] == "No changes detected", ret["four"]["comment"]
- def test_directory(self):
- """
- file.directory
- """
- name = os.path.join(RUNTIME_VARS.TMP, "a_new_dir")
- ret = self.run_state("file.directory", name=name)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isdir(name))
- def test_directory_symlink_dry_run(self):
- """
- Ensure that symlinks are followed when file.directory is run with
- test=True
- """
- try:
- tmp_dir = os.path.join(RUNTIME_VARS.TMP, "pgdata")
- sym_dir = os.path.join(RUNTIME_VARS.TMP, "pg_data")
- os.mkdir(tmp_dir, 0o700)
- self.run_function("file.symlink", [tmp_dir, sym_dir])
- if IS_WINDOWS:
- ret = self.run_state(
- "file.directory",
- test=True,
- name=sym_dir,
- follow_symlinks=True,
- win_owner="Administrators",
- )
- else:
- ret = self.run_state(
- "file.directory",
- test=True,
- name=sym_dir,
- follow_symlinks=True,
- mode=700,
- )
- self.assertSaltTrueReturn(ret)
- finally:
- if os.path.isdir(tmp_dir):
- self.run_function("file.remove", [tmp_dir])
- if os.path.islink(sym_dir):
- self.run_function("file.remove", [sym_dir])
- @skip_if_not_root
- @skipIf(IS_WINDOWS, "Mode not available in Windows")
- def test_directory_max_depth(self):
- """
- file.directory
- Test the max_depth option by iteratively increasing the depth and
- checking that no changes deeper than max_depth have been attempted
- """
- def _get_oct_mode(name):
- """
- Return a string octal representation of the permissions for name
- """
- return salt.utils.files.normalize_mode(oct(os.stat(name).st_mode & 0o777))
- top = os.path.join(RUNTIME_VARS.TMP, "top_dir")
- sub = os.path.join(top, "sub_dir")
- subsub = os.path.join(sub, "sub_sub_dir")
- dirs = [top, sub, subsub]
- initial_mode = "0111"
- changed_mode = "0555"
- initial_modes = {
- 0: {sub: "0755", subsub: "0111"},
- 1: {sub: "0111", subsub: "0111"},
- 2: {sub: "0111", subsub: "0111"},
- }
- if not os.path.isdir(subsub):
- os.makedirs(subsub, int(initial_mode, 8))
- try:
- for depth in range(0, 3):
- ret = self.run_state(
- "file.directory",
- name=top,
- max_depth=depth,
- dir_mode=changed_mode,
- recurse=["mode"],
- )
- self.assertSaltTrueReturn(ret)
- for changed_dir in dirs[0 : depth + 1]:
- self.assertEqual(changed_mode, _get_oct_mode(changed_dir))
- for untouched_dir in dirs[depth + 1 :]:
- # Beginning in Python 3.7, os.makedirs no longer sets
- # the mode of intermediate directories to the mode that
- # is passed.
- if sys.version_info >= (3, 7):
- _mode = initial_modes[depth][untouched_dir]
- self.assertEqual(_mode, _get_oct_mode(untouched_dir))
- else:
- self.assertEqual(initial_mode, _get_oct_mode(untouched_dir))
- finally:
- shutil.rmtree(top)
- def test_test_directory(self):
- """
- file.directory
- """
- name = os.path.join(RUNTIME_VARS.TMP, "a_not_dir")
- ret = self.run_state("file.directory", test=True, name=name)
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isdir(name))
- @with_tempdir()
- def test_directory_clean(self, base_dir):
- """
- file.directory with clean=True
- """
- name = os.path.join(base_dir, "directory_clean_dir")
- os.mkdir(name)
- strayfile = os.path.join(name, "strayfile")
- with salt.utils.files.fopen(strayfile, "w"):
- pass
- straydir = os.path.join(name, "straydir")
- if not os.path.isdir(straydir):
- os.makedirs(straydir)
- with salt.utils.files.fopen(os.path.join(straydir, "strayfile2"), "w"):
- pass
- ret = self.run_state("file.directory", name=name, clean=True)
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertFalse(os.path.exists(straydir))
- self.assertTrue(os.path.isdir(name))
- def test_directory_is_idempotent(self):
- """
- Ensure the file.directory state produces no changes when rerun.
- """
- name = os.path.join(RUNTIME_VARS.TMP, "a_dir_twice")
- if IS_WINDOWS:
- username = os.environ.get("USERNAME", "Administrators")
- domain = os.environ.get("USERDOMAIN", "")
- fullname = "{0}\\{1}".format(domain, username)
- ret = self.run_state("file.directory", name=name, win_owner=fullname)
- else:
- ret = self.run_state("file.directory", name=name)
- self.assertSaltTrueReturn(ret)
- if IS_WINDOWS:
- ret = self.run_state("file.directory", name=name, win_owner=username)
- else:
- ret = self.run_state("file.directory", name=name)
- self.assertSaltTrueReturn(ret)
- self.assertSaltStateChangesEqual(ret, {})
- @with_tempdir()
- def test_directory_clean_exclude(self, base_dir):
- """
- file.directory with clean=True and exclude_pat set
- """
- name = os.path.join(base_dir, "directory_clean_dir")
- if not os.path.isdir(name):
- os.makedirs(name)
- strayfile = os.path.join(name, "strayfile")
- with salt.utils.files.fopen(strayfile, "w"):
- pass
- straydir = os.path.join(name, "straydir")
- if not os.path.isdir(straydir):
- os.makedirs(straydir)
- strayfile2 = os.path.join(straydir, "strayfile2")
- with salt.utils.files.fopen(strayfile2, "w"):
- pass
- keepfile = os.path.join(straydir, "keepfile")
- with salt.utils.files.fopen(keepfile, "w"):
- pass
- exclude_pat = "E@^straydir(|/keepfile)$"
- if IS_WINDOWS:
- exclude_pat = "E@^straydir(|\\\\keepfile)$"
- ret = self.run_state(
- "file.directory", name=name, clean=True, exclude_pat=exclude_pat
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertFalse(os.path.exists(strayfile2))
- self.assertTrue(os.path.exists(keepfile))
- @skipIf(IS_WINDOWS, "Skip on windows")
- @with_tempdir()
- def test_test_directory_clean_exclude(self, base_dir):
- """
- file.directory with test=True, clean=True and exclude_pat set
- Skipped on windows because clean and exclude_pat not supported by
- salt.sates.file._check_directory_win
- """
- name = os.path.join(base_dir, "directory_clean_dir")
- os.mkdir(name)
- strayfile = os.path.join(name, "strayfile")
- with salt.utils.files.fopen(strayfile, "w"):
- pass
- straydir = os.path.join(name, "straydir")
- if not os.path.isdir(straydir):
- os.makedirs(straydir)
- strayfile2 = os.path.join(straydir, "strayfile2")
- with salt.utils.files.fopen(strayfile2, "w"):
- pass
- keepfile = os.path.join(straydir, "keepfile")
- with salt.utils.files.fopen(keepfile, "w"):
- pass
- exclude_pat = "E@^straydir(|/keepfile)$"
- if IS_WINDOWS:
- exclude_pat = "E@^straydir(|\\\\keepfile)$"
- ret = self.run_state(
- "file.directory", test=True, name=name, clean=True, exclude_pat=exclude_pat
- )
- comment = next(six.itervalues(ret))["comment"]
- self.assertSaltNoneReturn(ret)
- self.assertTrue(os.path.exists(strayfile))
- self.assertTrue(os.path.exists(strayfile2))
- self.assertTrue(os.path.exists(keepfile))
- self.assertIn(strayfile, comment)
- self.assertIn(strayfile2, comment)
- self.assertNotIn(keepfile, comment)
- @with_tempdir()
- def test_directory_clean_require_in(self, name):
- """
- file.directory test with clean=True and require_in file
- """
- state_name = "file-FileTest-test_directory_clean_require_in"
- state_filename = state_name + ".sls"
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
- wrong_file = os.path.join(name, "wrong")
- with salt.utils.files.fopen(wrong_file, "w") as fp:
- fp.write("foo")
- good_file = os.path.join(name, "bar")
- with salt.utils.files.fopen(state_file, "w") as fp:
- self.addCleanup(lambda: os.remove(state_file))
- fp.write(
- textwrap.dedent(
- """\
- some_dir:
- file.directory:
- - name: {name}
- - clean: true
- {good_file}:
- file.managed:
- - require_in:
- - file: some_dir
- """.format(
- name=name, good_file=good_file
- )
- )
- )
- ret = self.run_function("state.sls", [state_name])
- self.assertTrue(os.path.exists(good_file))
- self.assertFalse(os.path.exists(wrong_file))
- @with_tempdir()
- def test_directory_clean_require_in_with_id(self, name):
- """
- file.directory test with clean=True and require_in file with an ID
- different from the file name
- """
- state_name = "file-FileTest-test_directory_clean_require_in_with_id"
- state_filename = state_name + ".sls"
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
- wrong_file = os.path.join(name, "wrong")
- with salt.utils.files.fopen(wrong_file, "w") as fp:
- fp.write("foo")
- good_file = os.path.join(name, "bar")
- with salt.utils.files.fopen(state_file, "w") as fp:
- self.addCleanup(lambda: os.remove(state_file))
- fp.write(
- textwrap.dedent(
- """\
- some_dir:
- file.directory:
- - name: {name}
- - clean: true
- some_file:
- file.managed:
- - name: {good_file}
- - require_in:
- - file: some_dir
- """.format(
- name=name, good_file=good_file
- )
- )
- )
- ret = self.run_function("state.sls", [state_name])
- self.assertTrue(os.path.exists(good_file))
- self.assertFalse(os.path.exists(wrong_file))
- @skipIf(
- salt.utils.platform.is_darwin(),
- "WAR ROOM TEMPORARY SKIP, Test is flaky on macosx",
- )
- @with_tempdir()
- def test_directory_clean_require_with_name(self, name):
- """
- file.directory test with clean=True and require with a file state
- relatively to the state's name, not its ID.
- """
- state_name = "file-FileTest-test_directory_clean_require_in_with_id"
- state_filename = state_name + ".sls"
- state_file = os.path.join(RUNTIME_VARS.BASE_FILES, state_filename)
- wrong_file = os.path.join(name, "wrong")
- with salt.utils.files.fopen(wrong_file, "w") as fp:
- fp.write("foo")
- good_file = os.path.join(name, "bar")
- with salt.utils.files.fopen(state_file, "w") as fp:
- self.addCleanup(lambda: os.remove(state_file))
- fp.write(
- textwrap.dedent(
- """\
- some_dir:
- file.directory:
- - name: {name}
- - clean: true
- - require:
- # This requirement refers to the name of the following
- # state, not its ID.
- - file: {good_file}
- some_file:
- file.managed:
- - name: {good_file}
- """.format(
- name=name, good_file=good_file
- )
- )
- )
- ret = self.run_function("state.sls", [state_name])
- self.assertTrue(os.path.exists(good_file))
- self.assertFalse(os.path.exists(wrong_file))
- def test_directory_broken_symlink(self):
- """
- Ensure that file.directory works even if a directory
- contains broken symbolic link
- """
- try:
- tmp_dir = os.path.join(RUNTIME_VARS.TMP, "foo")
- null_file = "{0}/null".format(tmp_dir)
- broken_link = "{0}/broken".format(tmp_dir)
- os.mkdir(tmp_dir, 0o700)
- self.run_function("file.symlink", [null_file, broken_link])
- if IS_WINDOWS:
- ret = self.run_state(
- "file.directory",
- name=tmp_dir,
- recurse=["mode"],
- follow_symlinks=True,
- win_owner="Administrators",
- )
- else:
- ret = self.run_state(
- "file.directory",
- name=tmp_dir,
- recurse=["mode"],
- file_mode=644,
- dir_mode=755,
- )
- self.assertSaltTrueReturn(ret)
- finally:
- if os.path.isdir(tmp_dir):
- self.run_function("file.remove", [tmp_dir])
- @with_tempdir(create=False)
- def test_recurse(self, name):
- """
- file.recurse
- """
- ret = self.run_state("file.recurse", name=name, source="salt://grail")
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(name, "36", "scene")))
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_recurse_specific_env(self, dir1, dir2):
- """
- file.recurse passing __env__
- """
- ret = self.run_state(
- "file.recurse", name=dir1, source="salt://holy", __env__="prod"
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir1, "32", "scene")))
- ret = self.run_state(
- "file.recurse", name=dir2, source="salt://holy", saltenv="prod"
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir2, "32", "scene")))
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_recurse_specific_env_in_url(self, dir1, dir2):
- """
- file.recurse passing __env__
- """
- ret = self.run_state(
- "file.recurse", name=dir1, source="salt://holy?saltenv=prod"
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir1, "32", "scene")))
- ret = self.run_state(
- "file.recurse", name=dir2, source="salt://holy?saltenv=prod"
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(os.path.join(dir2, "32", "scene")))
- @with_tempdir(create=False)
- def test_test_recurse(self, name):
- """
- file.recurse test interface
- """
- ret = self.run_state(
- "file.recurse", test=True, name=name, source="salt://grail",
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(os.path.join(name, "36", "scene")))
- self.assertFalse(os.path.exists(name))
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_test_recurse_specific_env(self, dir1, dir2):
- """
- file.recurse test interface
- """
- ret = self.run_state(
- "file.recurse", test=True, name=dir1, source="salt://holy", __env__="prod"
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(os.path.join(dir1, "32", "scene")))
- self.assertFalse(os.path.exists(dir1))
- ret = self.run_state(
- "file.recurse", test=True, name=dir2, source="salt://holy", saltenv="prod"
- )
- self.assertSaltNoneReturn(ret)
- self.assertFalse(os.path.isfile(os.path.join(dir2, "32", "scene")))
- self.assertFalse(os.path.exists(dir2))
- @with_tempdir(create=False)
- def test_recurse_template(self, name):
- """
- file.recurse with jinja template enabled
- """
- _ts = "TEMPLATE TEST STRING"
- ret = self.run_state(
- "file.recurse",
- name=name,
- source="salt://grail",
- template="jinja",
- defaults={"spam": _ts},
- )
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(os.path.join(name, "scene33"), "r") as fp_:
- contents = fp_.read()
- self.assertIn(_ts, contents)
- @with_tempdir()
- def test_recurse_clean(self, name):
- """
- file.recurse with clean=True
- """
- strayfile = os.path.join(name, "strayfile")
- with salt.utils.files.fopen(strayfile, "w"):
- pass
- # Corner cases: replacing file with a directory and vice versa
- with salt.utils.files.fopen(os.path.join(name, "36"), "w"):
- pass
- os.makedirs(os.path.join(name, "scene33"))
- ret = self.run_state(
- "file.recurse", name=name, source="salt://grail", clean=True
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertTrue(os.path.isfile(os.path.join(name, "36", "scene")))
- self.assertTrue(os.path.isfile(os.path.join(name, "scene33")))
- @with_tempdir()
- def test_recurse_clean_specific_env(self, name):
- """
- file.recurse with clean=True and __env__=prod
- """
- strayfile = os.path.join(name, "strayfile")
- with salt.utils.files.fopen(strayfile, "w"):
- pass
- # Corner cases: replacing file with a directory and vice versa
- with salt.utils.files.fopen(os.path.join(name, "32"), "w"):
- pass
- os.makedirs(os.path.join(name, "scene34"))
- ret = self.run_state(
- "file.recurse", name=name, source="salt://holy", clean=True, __env__="prod"
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(os.path.exists(strayfile))
- self.assertTrue(os.path.isfile(os.path.join(name, "32", "scene")))
- self.assertTrue(os.path.isfile(os.path.join(name, "scene34")))
- @skipIf(IS_WINDOWS, "Skip on windows")
- @with_tempdir()
- def test_recurse_issue_34945(self, base_dir):
- """
- This tests the case where the source dir for the file.recurse state
- does not contain any files (only subdirectories), and the dir_mode is
- being managed. For a long time, this corner case resulted in the top
- level of the destination directory being created with the wrong initial
- permissions, a problem that would be corrected later on in the
- file.recurse state via running state.directory. However, the
- file.directory state only gets called when there are files to be
- managed in that directory, and when the source directory contains only
- subdirectories, the incorrectly-set initial perms would not be
- repaired.
- This was fixed in https://github.com/saltstack/salt/pull/35309
- Skipped on windows because dir_mode is not supported.
- """
- dir_mode = "2775"
- issue_dir = "issue-34945"
- name = os.path.join(base_dir, issue_dir)
- ret = self.run_state(
- "file.recurse", name=name, source="salt://" + issue_dir, dir_mode=dir_mode
- )
- self.assertSaltTrueReturn(ret)
- actual_dir_mode = oct(stat.S_IMODE(os.stat(name).st_mode))[-4:]
- self.assertEqual(dir_mode, actual_dir_mode)
- @with_tempdir(create=False)
- def test_recurse_issue_40578(self, name):
- """
- This ensures that the state doesn't raise an exception when it
- encounters a file with a unicode filename in the process of invoking
- file.source_list.
- """
- ret = self.run_state("file.recurse", name=name, source="salt://соль")
- self.assertSaltTrueReturn(ret)
- if six.PY2 and IS_WINDOWS:
- # Providing unicode to os.listdir so that we avoid having listdir
- # try to decode the filenames using the systemencoding on windows
- # python 2.
- files = os.listdir(name.decode("utf-8"))
- else:
- files = salt.utils.data.decode(os.listdir(name), normalize=True)
- self.assertEqual(
- sorted(files), sorted(["foo.txt", "спам.txt", "яйца.txt"]),
- )
- @with_tempfile()
- def test_replace(self, name):
- """
- file.replace
- """
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("change_me")
- ret = self.run_state(
- "file.replace", name=name, pattern="change", repl="salt", backup=False
- )
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertIn("salt", fp_.read())
- self.assertSaltTrueReturn(ret)
- @with_tempdir()
- def test_replace_issue_18612(self, base_dir):
- """
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested file contains one commented line
- The commented line should be uncommented in the end, nothing else should change
- """
- test_name = "test_replace_issue_18612"
- path_test = os.path.join(base_dir, test_name)
- with salt.utils.files.fopen(path_test, "w+") as fp_test_:
- fp_test_.write("# en_US.UTF-8")
- ret = []
- for x in range(0, 3):
- ret.append(
- self.run_state(
- "file.replace",
- name=path_test,
- pattern="^# en_US.UTF-8$",
- repl="en_US.UTF-8",
- append_if_not_found=True,
- )
- )
- # ensure, the number of lines didn't change, even after invoking 'file.replace' 3 times
- with salt.utils.files.fopen(path_test, "r") as fp_test_:
- self.assertTrue((sum(1 for _ in fp_test_) == 1))
- # ensure, the replacement succeeded
- with salt.utils.files.fopen(path_test, "r") as fp_test_:
- self.assertTrue(fp_test_.read().startswith("en_US.UTF-8"))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_prepend(self, base_dir):
- """
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested multifile contains multiple lines not matching the pattern or replacement in any way
- The replacement pattern should be prepended to the file
- """
- test_name = "test_replace_issue_18612_prepend"
- path_in = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
- )
- path_out = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(
- self.run_state(
- "file.replace",
- name=path_test,
- pattern="^# en_US.UTF-8$",
- repl="en_US.UTF-8",
- prepend_if_not_found=True,
- )
- )
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_append(self, base_dir):
- """
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested multifile contains multiple lines not matching the pattern or replacement in any way
- The replacement pattern should be appended to the file
- """
- test_name = "test_replace_issue_18612_append"
- path_in = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
- )
- path_out = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(
- self.run_state(
- "file.replace",
- name=path_test,
- pattern="^# en_US.UTF-8$",
- repl="en_US.UTF-8",
- append_if_not_found=True,
- )
- )
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_append_not_found_content(self, base_dir):
- """
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested multifile contains multiple lines not matching the pattern or replacement in any way
- The 'not_found_content' value should be appended to the file
- """
- test_name = "test_replace_issue_18612_append_not_found_content"
- path_in = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
- )
- path_out = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(
- self.run_state(
- "file.replace",
- name=path_test,
- pattern="^# en_US.UTF-8$",
- repl="en_US.UTF-8",
- append_if_not_found=True,
- not_found_content="THIS LINE WASN'T FOUND! SO WE'RE APPENDING IT HERE!",
- )
- )
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
- # ensure, all runs of 'file.replace' reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18612_change_mid_line_with_comment(self, base_dir):
- """
- Test the (mis-)behaviour of file.replace as described in #18612:
- Using 'prepend_if_not_found' or 'append_if_not_found' resulted in
- an infinitely growing file as 'file.replace' didn't check beforehand
- whether the changes had already been done to the file
- # Case description:
- The tested file contains 5 key=value pairs
- The commented key=value pair #foo=bar should be changed to foo=salt
- The comment char (#) in front of foo=bar should be removed
- """
- test_name = "test_replace_issue_18612_change_mid_line_with_comment"
- path_in = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
- )
- path_out = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{0}.out".format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- ret = []
- for x in range(0, 3):
- ret.append(
- self.run_state(
- "file.replace",
- name=path_test,
- pattern="^#foo=bar($|(?=\r\n))",
- repl="foo=salt",
- append_if_not_found=True,
- )
- )
- # ensure, the resulting file contains the expected lines
- self.assertTrue(filecmp.cmp(path_test, path_out))
- # ensure the initial file was properly backed up
- self.assertTrue(filecmp.cmp(path_test + ".bak", path_in))
- # ensure, all 'file.replace' runs reported success
- for item in ret:
- self.assertSaltTrueReturn(item)
- @with_tempdir()
- def test_replace_issue_18841_no_changes(self, base_dir):
- """
- Test the (mis-)behaviour of file.replace as described in #18841:
- Using file.replace in a way which shouldn't modify the file at all
- results in changed mtime of the original file and a backup file being created.
- # Case description
- The tested file contains multiple lines
- The tested file contains a line already matching the replacement (no change needed)
- The tested file's content shouldn't change at all
- The tested file's mtime shouldn't change at all
- No backup file should be created
- """
- test_name = "test_replace_issue_18841_no_changes"
- path_in = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- # get (m|a)time of file
- fstats_orig = os.stat(path_test)
- # define how far we predate the file
- age = 5 * 24 * 60 * 60
- # set (m|a)time of file 5 days into the past
- os.utime(path_test, (fstats_orig.st_mtime - age, fstats_orig.st_atime - age))
- ret = self.run_state(
- "file.replace",
- name=path_test,
- pattern="^hello world$",
- repl="goodbye world",
- show_changes=True,
- flags=["IGNORECASE"],
- backup=False,
- )
- # get (m|a)time of file
- fstats_post = os.stat(path_test)
- # ensure, the file content didn't change
- self.assertTrue(filecmp.cmp(path_in, path_test))
- # ensure no backup file was created
- self.assertFalse(os.path.exists(path_test + ".bak"))
- # ensure the file's mtime didn't change
- self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime - age)
- # ensure, all 'file.replace' runs reported success
- self.assertSaltTrueReturn(ret)
- def test_serialize(self):
- """
- Test to ensure that file.serialize returns a data structure that's
- both serialized and formatted properly
- """
- path_test = os.path.join(RUNTIME_VARS.TMP, "test_serialize")
- ret = self.run_state(
- "file.serialize",
- name=path_test,
- dataset={
- "name": "naive",
- "description": "A basic test",
- "a_list": ["first_element", "second_element"],
- "finally": "the last item",
- },
- formatter="json",
- )
- with salt.utils.files.fopen(path_test, "rb") as fp_:
- serialized_file = salt.utils.stringutils.to_unicode(fp_.read())
- # The JSON serializer uses LF even on OSes where os.sep is CRLF.
- expected_file = "\n".join(
- [
- "{",
- ' "a_list": [',
- ' "first_element",',
- ' "second_element"',
- " ],",
- ' "description": "A basic test",',
- ' "finally": "the last item",',
- ' "name": "naive"',
- "}",
- "",
- ]
- )
- self.assertEqual(serialized_file, expected_file)
- @with_tempfile(create=False)
- def test_serializer_deserializer_opts(self, name):
- """
- Test the serializer_opts and deserializer_opts options
- """
- data1 = {"foo": {"bar": "%(x)s"}}
- data2 = {"foo": {"abc": 123}}
- merged = {"foo": {"y": "not_used", "x": "baz", "abc": 123, "bar": "baz"}}
- ret = self.run_state(
- "file.serialize",
- name=name,
- dataset=data1,
- formatter="configparser",
- deserializer_opts=[{"defaults": {"y": "not_used"}}],
- )
- ret = ret[next(iter(ret))]
- assert ret["result"], ret
- # We should have warned about deserializer_opts being used when
- # merge_if_exists was not set to True.
- assert "warnings" in ret
- # Run with merge_if_exists, as well as serializer and deserializer opts
- # deserializer opts will be used for string interpolation of the %(x)s
- # that was written to the file with data1 (i.e. bar should become baz)
- ret = self.run_state(
- "file.serialize",
- name=name,
- dataset=data2,
- formatter="configparser",
- merge_if_exists=True,
- serializer_opts=[{"defaults": {"y": "not_used"}}],
- deserializer_opts=[{"defaults": {"x": "baz"}}],
- )
- ret = ret[next(iter(ret))]
- assert ret["result"], ret
- with salt.utils.files.fopen(name) as fp_:
- serialized_data = salt.serializers.configparser.deserialize(fp_)
- # If this test fails, this debug logging will help tell us how the
- # serialized data differs from what was serialized.
- log.debug("serialized_data = %r", serialized_data)
- log.debug("merged = %r", merged)
- # serializing with a default of 'y' will add y = not_used into foo
- assert serialized_data["foo"]["y"] == merged["foo"]["y"]
- # deserializing with default of x = baz will perform interpolation on %(x)s
- # and bar will then = baz
- assert serialized_data["foo"]["bar"] == merged["foo"]["bar"]
- @with_tempdir()
- def test_replace_issue_18841_omit_backup(self, base_dir):
- """
- Test the (mis-)behaviour of file.replace as described in #18841:
- Using file.replace in a way which shouldn't modify the file at all
- results in changed mtime of the original file and a backup file being created.
- # Case description
- The tested file contains multiple lines
- The tested file contains a line already matching the replacement (no change needed)
- The tested file's content shouldn't change at all
- The tested file's mtime shouldn't change at all
- No backup file should be created, although backup=False isn't explicitly defined
- """
- test_name = "test_replace_issue_18841_omit_backup"
- path_in = os.path.join(
- RUNTIME_VARS.FILES, "file.replace", "{0}.in".format(test_name)
- )
- path_test = os.path.join(base_dir, test_name)
- # create test file based on initial template
- shutil.copyfile(path_in, path_test)
- # get (m|a)time of file
- fstats_orig = os.stat(path_test)
- # define how far we predate the file
- age = 5 * 24 * 60 * 60
- # set (m|a)time of file 5 days into the past
- os.utime(path_test, (fstats_orig.st_mtime - age, fstats_orig.st_atime - age))
- ret = self.run_state(
- "file.replace",
- name=path_test,
- pattern="^hello world$",
- repl="goodbye world",
- show_changes=True,
- flags=["IGNORECASE"],
- )
- # get (m|a)time of file
- fstats_post = os.stat(path_test)
- # ensure, the file content didn't change
- self.assertTrue(filecmp.cmp(path_in, path_test))
- # ensure no backup file was created
- self.assertFalse(os.path.exists(path_test + ".bak"))
- # ensure the file's mtime didn't change
- self.assertTrue(fstats_post.st_mtime, fstats_orig.st_mtime - age)
- # ensure, all 'file.replace' runs reported success
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_comment(self, name):
- """
- file.comment
- """
- # write a line to file
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("comment_me")
- # Look for changes with test=True: return should be "None" at the first run
- ret = self.run_state("file.comment", test=True, name=name, regex="^comment")
- self.assertSaltNoneReturn(ret)
- # comment once
- ret = self.run_state("file.comment", name=name, regex="^comment")
- # result is positive
- self.assertSaltTrueReturn(ret)
- # line is commented
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertTrue(fp_.read().startswith("#comment"))
- # comment twice
- ret = self.run_state("file.comment", name=name, regex="^comment")
- # result is still positive
- self.assertSaltTrueReturn(ret)
- # line is still commented
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertTrue(fp_.read().startswith("#comment"))
- # Test previously commented file returns "True" now and not "None" with test=True
- ret = self.run_state("file.comment", test=True, name=name, regex="^comment")
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_test_comment(self, name):
- """
- file.comment test interface
- """
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("comment_me")
- ret = self.run_state("file.comment", test=True, name=name, regex=".*comment.*",)
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertNotIn("#comment", fp_.read())
- self.assertSaltNoneReturn(ret)
- @with_tempfile()
- def test_uncomment(self, name):
- """
- file.uncomment
- """
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("#comment_me")
- ret = self.run_state("file.uncomment", name=name, regex="^comment")
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertNotIn("#comment", fp_.read())
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_test_uncomment(self, name):
- """
- file.comment test interface
- """
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("#comment_me")
- ret = self.run_state("file.uncomment", test=True, name=name, regex="^comment.*")
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertIn("#comment", fp_.read())
- self.assertSaltNoneReturn(ret)
- @with_tempfile()
- def test_append(self, name):
- """
- file.append
- """
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("#salty!")
- ret = self.run_state("file.append", name=name, text="cheese")
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertIn("cheese", fp_.read())
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_test_append(self, name):
- """
- file.append test interface
- """
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write("#salty!")
- ret = self.run_state("file.append", test=True, name=name, text="cheese")
- with salt.utils.files.fopen(name, "r") as fp_:
- self.assertNotIn("cheese", fp_.read())
- self.assertSaltNoneReturn(ret)
- @with_tempdir()
- def test_append_issue_1864_makedirs(self, base_dir):
- """
- file.append but create directories if needed as an option, and create
- the file if it doesn't exist
- """
- fname = "append_issue_1864_makedirs"
- name = os.path.join(base_dir, fname)
- # Non existing file get's touched
- ret = self.run_state("file.append", name=name, text="cheese", makedirs=True)
- self.assertSaltTrueReturn(ret)
- # Nested directory and file get's touched
- name = os.path.join(base_dir, "issue_1864", fname)
- ret = self.run_state("file.append", name=name, text="cheese", makedirs=True)
- self.assertSaltTrueReturn(ret)
- # Parent directory exists but file does not and makedirs is False
- name = os.path.join(base_dir, "issue_1864", fname + "2")
- ret = self.run_state("file.append", name=name, text="cheese")
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(name))
- @with_tempdir()
- def test_prepend_issue_27401_makedirs(self, base_dir):
- """
- file.prepend but create directories if needed as an option, and create
- the file if it doesn't exist
- """
- fname = "prepend_issue_27401"
- name = os.path.join(base_dir, fname)
- # Non existing file get's touched
- ret = self.run_state("file.prepend", name=name, text="cheese", makedirs=True)
- self.assertSaltTrueReturn(ret)
- # Nested directory and file get's touched
- name = os.path.join(base_dir, "issue_27401", fname)
- ret = self.run_state("file.prepend", name=name, text="cheese", makedirs=True)
- self.assertSaltTrueReturn(ret)
- # Parent directory exists but file does not and makedirs is False
- name = os.path.join(base_dir, "issue_27401", fname + "2")
- ret = self.run_state("file.prepend", name=name, text="cheese")
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isfile(name))
- @with_tempfile()
- def test_touch(self, name):
- """
- file.touch
- """
- ret = self.run_state("file.touch", name=name)
- self.assertTrue(os.path.isfile(name))
- self.assertSaltTrueReturn(ret)
- @with_tempfile(create=False)
- def test_test_touch(self, name):
- """
- file.touch test interface
- """
- ret = self.run_state("file.touch", test=True, name=name)
- self.assertFalse(os.path.isfile(name))
- self.assertSaltNoneReturn(ret)
- @with_tempdir()
- def test_touch_directory(self, base_dir):
- """
- file.touch a directory
- """
- name = os.path.join(base_dir, "touch_test_dir")
- os.mkdir(name)
- ret = self.run_state("file.touch", name=name)
- self.assertSaltTrueReturn(ret)
- self.assertTrue(os.path.isdir(name))
- @with_tempdir()
- def test_issue_2227_file_append(self, base_dir):
- """
- Text to append includes a percent symbol
- """
- # let's make use of existing state to create a file with contents to
- # test against
- tmp_file_append = os.path.join(base_dir, "test.append")
- self.run_state("file.touch", name=tmp_file_append)
- self.run_state(
- "file.append", name=tmp_file_append, source="salt://testappend/firstif"
- )
- self.run_state(
- "file.append", name=tmp_file_append, source="salt://testappend/secondif"
- )
- # Now our real test
- try:
- ret = self.run_state(
- "file.append", name=tmp_file_append, text="HISTTIMEFORMAT='%F %T '"
- )
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(tmp_file_append, "r") as fp_:
- contents_pre = fp_.read()
- # It should not append text again
- ret = self.run_state(
- "file.append", name=tmp_file_append, text="HISTTIMEFORMAT='%F %T '"
- )
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(tmp_file_append, "r") as fp_:
- contents_post = fp_.read()
- self.assertEqual(contents_pre, contents_post)
- except AssertionError:
- if os.path.exists(tmp_file_append):
- shutil.copy(tmp_file_append, tmp_file_append + ".bak")
- raise
- @with_tempdir()
- def test_issue_2401_file_comment(self, base_dir):
- # Get a path to the temporary file
- tmp_file = os.path.join(base_dir, "issue-2041-comment.txt")
- # Write some data to it
- with salt.utils.files.fopen(tmp_file, "w") as fp_:
- fp_.write("hello\nworld\n")
- # create the sls template
- template_lines = [
- "{0}:".format(tmp_file),
- " file.comment:",
- " - regex: ^world",
- ]
- template = "\n".join(template_lines)
- try:
- ret = self.run_function("state.template_str", [template], timeout=120)
- self.assertSaltTrueReturn(ret)
- self.assertNotInSaltComment("Pattern already commented", ret)
- self.assertInSaltComment("Commented lines successfully", ret)
- # This next time, it is already commented.
- ret = self.run_function("state.template_str", [template], timeout=120)
- self.assertSaltTrueReturn(ret)
- self.assertInSaltComment("Pattern already commented", ret)
- except AssertionError:
- shutil.copy(tmp_file, tmp_file + ".bak")
- raise
- @with_tempdir()
- def test_issue_2379_file_append(self, base_dir):
- # Get a path to the temporary file
- tmp_file = os.path.join(base_dir, "issue-2379-file-append.txt")
- # Write some data to it
- with salt.utils.files.fopen(tmp_file, "w") as fp_:
- fp_.write(
- "hello\nworld\n" # Some junk
- "#PermitRootLogin yes\n" # Commented text
- "# PermitRootLogin yes\n" # Commented text with space
- )
- # create the sls template
- template_lines = [
- "{0}:".format(tmp_file),
- " file.append:",
- " - text: PermitRootLogin yes",
- ]
- template = "\n".join(template_lines)
- try:
- ret = self.run_function("state.template_str", [template])
- self.assertSaltTrueReturn(ret)
- self.assertInSaltComment("Appended 1 lines", ret)
- except AssertionError:
- shutil.copy(tmp_file, tmp_file + ".bak")
- raise
- @skipIf(IS_WINDOWS, "Mode not available in Windows")
- @with_tempdir(create=False)
- @with_tempdir(create=False)
- def test_issue_2726_mode_kwarg(self, dir1, dir2):
- # Let's test for the wrong usage approach
- bad_mode_kwarg_testfile = os.path.join(dir1, "bad_mode_kwarg", "testfile")
- bad_template = [
- "{0}:".format(bad_mode_kwarg_testfile),
- " file.recurse:",
- " - source: salt://testfile",
- " - mode: 644",
- ]
- ret = self.run_function("state.template_str", [os.linesep.join(bad_template)])
- self.assertSaltFalseReturn(ret)
- self.assertInSaltComment(
- "'mode' is not allowed in 'file.recurse'. Please use "
- "'file_mode' and 'dir_mode'.",
- ret,
- )
- self.assertNotInSaltComment(
- "TypeError: managed() got multiple values for keyword " "argument 'mode'",
- ret,
- )
- # Now, the correct usage approach
- good_mode_kwargs_testfile = os.path.join(dir2, "good_mode_kwargs", "testappend")
- good_template = [
- "{0}:".format(good_mode_kwargs_testfile),
- " file.recurse:",
- " - source: salt://testappend",
- " - dir_mode: 744",
- " - file_mode: 644",
- ]
- ret = self.run_function("state.template_str", [os.linesep.join(good_template)])
- self.assertSaltTrueReturn(ret)
- @with_tempdir()
- def test_issue_8343_accumulated_require_in(self, base_dir):
- template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-8343.sls")
- testcase_filedest = os.path.join(base_dir, "issue-8343.txt")
- if os.path.exists(template_path):
- os.remove(template_path)
- if os.path.exists(testcase_filedest):
- os.remove(testcase_filedest)
- sls_template = [
- "{0}:",
- " file.managed:",
- " - contents: |",
- " #",
- "",
- "prepend-foo-accumulator-from-pillar:",
- " file.accumulated:",
- " - require_in:",
- " - file: prepend-foo-management",
- " - filename: {0}",
- " - text: |",
- " foo",
- "",
- "append-foo-accumulator-from-pillar:",
- " file.accumulated:",
- " - require_in:",
- " - file: append-foo-management",
- " - filename: {0}",
- " - text: |",
- " bar",
- "",
- "prepend-foo-management:",
- " file.blockreplace:",
- " - name: {0}",
- ' - marker_start: "#-- start salt managed zonestart -- PLEASE, DO NOT EDIT"',
- ' - marker_end: "#-- end salt managed zonestart --"',
- " - content: ''",
- " - prepend_if_not_found: True",
- " - backup: '.bak'",
- " - show_changes: True",
- "",
- "append-foo-management:",
- " file.blockreplace:",
- " - name: {0}",
- ' - marker_start: "#-- start salt managed zoneend -- PLEASE, DO NOT EDIT"',
- ' - marker_end: "#-- end salt managed zoneend --"',
- " - content: ''",
- " - append_if_not_found: True",
- " - backup: '.bak2'",
- " - show_changes: True",
- "",
- ]
- with salt.utils.files.fopen(template_path, "w") as fp_:
- fp_.write(os.linesep.join(sls_template).format(testcase_filedest))
- ret = self.run_function("state.sls", mods="issue-8343")
- for name, step in six.iteritems(ret):
- self.assertSaltTrueReturn({name: step})
- with salt.utils.files.fopen(testcase_filedest) as fp_:
- contents = fp_.read().split(os.linesep)
- expected = [
- "#-- start salt managed zonestart -- PLEASE, DO NOT EDIT",
- "foo",
- "#-- end salt managed zonestart --",
- "#",
- "#-- start salt managed zoneend -- PLEASE, DO NOT EDIT",
- "bar",
- "#-- end salt managed zoneend --",
- "",
- ]
- self.assertEqual(
- [salt.utils.stringutils.to_str(line) for line in expected], contents
- )
- @with_tempdir()
- @skipIf(
- salt.utils.platform.is_darwin() and six.PY2, "This test hangs on OS X on Py2"
- )
- def test_issue_11003_immutable_lazy_proxy_sum(self, base_dir):
- # causes the Import-Module ServerManager error on Windows
- template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-11003.sls")
- testcase_filedest = os.path.join(base_dir, "issue-11003.txt")
- sls_template = [
- "a{0}:",
- " file.absent:",
- " - name: {0}",
- "",
- "{0}:",
- " file.managed:",
- " - contents: |",
- " #",
- "",
- "test-acc1:",
- " file.accumulated:",
- " - require_in:",
- " - file: final",
- " - filename: {0}",
- " - text: |",
- " bar",
- "",
- "test-acc2:",
- " file.accumulated:",
- " - watch_in:",
- " - file: final",
- " - filename: {0}",
- " - text: |",
- " baz",
- "",
- "final:",
- " file.blockreplace:",
- " - name: {0}",
- ' - marker_start: "#-- start managed zone PLEASE, DO NOT EDIT"',
- ' - marker_end: "#-- end managed zone"',
- " - content: ''",
- " - append_if_not_found: True",
- " - show_changes: True",
- ]
- with salt.utils.files.fopen(template_path, "w") as fp_:
- fp_.write(os.linesep.join(sls_template).format(testcase_filedest))
- ret = self.run_function("state.sls", mods="issue-11003", timeout=600)
- for name, step in six.iteritems(ret):
- self.assertSaltTrueReturn({name: step})
- with salt.utils.files.fopen(testcase_filedest) as fp_:
- contents = fp_.read().split(os.linesep)
- begin = contents.index("#-- start managed zone PLEASE, DO NOT EDIT") + 1
- end = contents.index("#-- end managed zone")
- block_contents = contents[begin:end]
- for item in ("", "bar", "baz"):
- block_contents.remove(item)
- self.assertEqual(block_contents, [])
- @with_tempdir()
- def test_issue_8947_utf8_sls(self, base_dir):
- """
- Test some file operation with utf-8 characters on the sls
- This is more generic than just a file test. Feel free to move
- """
- self.maxDiff = None
- korean_1 = "한국어 시험"
- korean_2 = "첫 번째 행"
- korean_3 = "마지막 행"
- test_file = os.path.join(base_dir, "{0}.txt".format(korean_1))
- test_file_encoded = test_file
- template_path = os.path.join(RUNTIME_VARS.TMP_STATE_TREE, "issue-8947.sls")
- # create the sls template
- template = textwrap.dedent(
- """\
- some-utf8-file-create:
- file.managed:
- - name: {test_file}
- - contents: {korean_1}
- - makedirs: True
- - replace: True
- - show_diff: True
- some-utf8-file-create2:
- file.managed:
- - name: {test_file}
- - contents: |
- {korean_2}
- {korean_1}
- {korean_3}
- - replace: True
- - show_diff: True
- """.format(
- **locals()
- )
- )
- if not salt.utils.platform.is_windows():
- template += textwrap.dedent(
- """\
- some-utf8-file-content-test:
- cmd.run:
- - name: 'cat "{test_file}"'
- - require:
- - file: some-utf8-file-create2
- """.format(
- **locals()
- )
- )
- # Save template file
- with salt.utils.files.fopen(template_path, "wb") as fp_:
- fp_.write(salt.utils.stringutils.to_bytes(template))
- try:
- result = self.run_function("state.sls", mods="issue-8947")
- if not isinstance(result, dict):
- raise AssertionError(
- (
- "Something went really wrong while testing this sls:" " {0}"
- ).format(repr(result))
- )
- # difflib produces different output on python 2.6 than on >=2.7
- if sys.version_info < (2, 7):
- diff = "--- \n+++ \n@@ -1,1 +1,3 @@\n"
- else:
- diff = "--- \n+++ \n@@ -1 +1,3 @@\n"
- diff += ("+첫 번째 행{0}" " 한국어 시험{0}" "+마지막 행{0}").format(os.linesep)
- ret = {x.split("_|-")[1]: y for x, y in six.iteritems(result)}
- # Confirm initial creation of file
- self.assertEqual(
- ret["some-utf8-file-create"]["comment"],
- "File {0} updated".format(test_file_encoded),
- )
- self.assertEqual(
- ret["some-utf8-file-create"]["changes"], {"diff": "New file"}
- )
- # Confirm file was modified and that the diff was as expected
- self.assertEqual(
- ret["some-utf8-file-create2"]["comment"],
- "File {0} updated".format(test_file_encoded),
- )
- self.assertEqual(ret["some-utf8-file-create2"]["changes"], {"diff": diff})
- if salt.utils.platform.is_windows():
- import subprocess
- import win32api
- p = subprocess.Popen(
- salt.utils.stringutils.to_str(
- "type {}".format(win32api.GetShortPathName(test_file))
- ),
- shell=True,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- )
- p.poll()
- out = p.stdout.read()
- self.assertEqual(
- out.decode("utf-8"),
- os.linesep.join((korean_2, korean_1, korean_3)) + os.linesep,
- )
- else:
- self.assertEqual(
- ret["some-utf8-file-content-test"]["comment"],
- 'Command "cat "{0}"" run'.format(test_file_encoded),
- )
- self.assertEqual(
- ret["some-utf8-file-content-test"]["changes"]["stdout"],
- "\n".join((korean_2, korean_1, korean_3)),
- )
- finally:
- try:
- os.remove(template_path)
- except OSError:
- pass
- @skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group(
- TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
- )
- @with_tempdir()
- def test_issue_12209_follow_symlinks(self, tempdir, user, group):
- """
- Ensure that symlinks are properly chowned when recursing (following
- symlinks)
- """
- # Make the directories for this test
- onedir = os.path.join(tempdir, "one")
- twodir = os.path.join(tempdir, "two")
- os.mkdir(onedir)
- os.symlink(onedir, twodir)
- # Run the state
- ret = self.run_state(
- "file.directory",
- name=tempdir,
- follow_symlinks=True,
- user=user,
- group=group,
- recurse=["user", "group"],
- )
- self.assertSaltTrueReturn(ret)
- # Double-check, in case state mis-reported a True result. Since we are
- # following symlinks, we expect twodir to still be owned by root, but
- # onedir should be owned by the 'issue12209' user.
- onestats = os.stat(onedir)
- twostats = os.lstat(twodir)
- self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
- self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, "root")
- self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
- if salt.utils.path.which("id"):
- root_group = self.run_function("user.primary_group", ["root"])
- self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, root_group)
- @skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group(
- TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
- )
- @with_tempdir()
- def test_issue_12209_no_follow_symlinks(self, tempdir, user, group):
- """
- Ensure that symlinks are properly chowned when recursing (not following
- symlinks)
- """
- # Make the directories for this test
- onedir = os.path.join(tempdir, "one")
- twodir = os.path.join(tempdir, "two")
- os.mkdir(onedir)
- os.symlink(onedir, twodir)
- # Run the state
- ret = self.run_state(
- "file.directory",
- name=tempdir,
- follow_symlinks=False,
- user=user,
- group=group,
- recurse=["user", "group"],
- )
- self.assertSaltTrueReturn(ret)
- # Double-check, in case state mis-reported a True result. Since we
- # are not following symlinks, we expect twodir to now be owned by
- # the 'issue12209' user, just link onedir.
- onestats = os.stat(onedir)
- twostats = os.lstat(twodir)
- self.assertEqual(pwd.getpwuid(onestats.st_uid).pw_name, user)
- self.assertEqual(pwd.getpwuid(twostats.st_uid).pw_name, user)
- self.assertEqual(grp.getgrgid(onestats.st_gid).gr_name, group)
- self.assertEqual(grp.getgrgid(twostats.st_gid).gr_name, group)
- @with_tempfile(create=False)
- @with_tempfile()
- def test_template_local_file(self, source, dest):
- """
- Test a file.managed state with a local file as the source. Test both
- with the file:// protocol designation prepended, and without it.
- """
- with salt.utils.files.fopen(source, "w") as fp_:
- fp_.write("{{ foo }}\n")
- for prefix in ("file://", ""):
- ret = self.run_state(
- "file.managed",
- name=dest,
- source=prefix + source,
- template="jinja",
- context={"foo": "Hello world!"},
- )
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_template_local_file_noclobber(self, source):
- """
- Test the case where a source file is in the minion's local filesystem,
- and the source path is the same as the destination path.
- """
- with salt.utils.files.fopen(source, "w") as fp_:
- fp_.write("{{ foo }}\n")
- ret = self.run_state(
- "file.managed",
- name=source,
- source=source,
- template="jinja",
- context={"foo": "Hello world!"},
- )
- self.assertSaltFalseReturn(ret)
- self.assertIn(
- ("Source file cannot be the same as destination"),
- ret[next(iter(ret))]["comment"],
- )
- @with_tempfile(create=False)
- @with_tempfile(create=False)
- def test_issue_25250_force_copy_deletes(self, source, dest):
- """
- ensure force option in copy state does not delete target file
- """
- shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "hosts"), source)
- shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "file/base/cheese"), dest)
- self.run_state("file.copy", name=dest, source=source, force=True)
- self.assertTrue(os.path.exists(dest))
- self.assertTrue(filecmp.cmp(source, dest))
- os.remove(source)
- os.remove(dest)
- @destructiveTest
- @skip_if_not_root
- @skipIf(IS_WINDOWS, "Windows does not report any file modes. Skipping.")
- @with_tempfile()
- def test_file_copy_make_dirs(self, source):
- """
- ensure make_dirs creates correct user perms
- """
- shutil.copyfile(os.path.join(RUNTIME_VARS.FILES, "hosts"), source)
- dest = os.path.join(RUNTIME_VARS.TMP, "dir1", "dir2", "copied_file.txt")
- user = "salt"
- mode = "0644"
- ret = self.run_function("user.add", [user])
- self.assertTrue(ret, "Failed to add user. Are you running as sudo?")
- ret = self.run_state(
- "file.copy", name=dest, source=source, user=user, makedirs=True, mode=mode
- )
- self.assertSaltTrueReturn(ret)
- file_checks = [
- dest,
- os.path.join(RUNTIME_VARS.TMP, "dir1"),
- os.path.join(RUNTIME_VARS.TMP, "dir1", "dir2"),
- ]
- for check in file_checks:
- user_check = self.run_function("file.get_user", [check])
- mode_check = self.run_function("file.get_mode", [check])
- self.assertEqual(user_check, user)
- self.assertEqual(salt.utils.files.normalize_mode(mode_check), mode)
- def test_contents_pillar_with_pillar_list(self):
- """
- This tests for any regressions for this issue:
- https://github.com/saltstack/salt/issues/30934
- """
- state_file = "file_contents_pillar"
- ret = self.run_function("state.sls", mods=state_file)
- self.assertSaltTrueReturn(ret)
- @skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group(
- TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
- )
- def test_owner_after_setuid(self, user, group):
- """
- Test to check file user/group after setting setuid or setgid.
- Because Python os.chown() does reset the setuid/setgid to 0.
- https://github.com/saltstack/salt/pull/45257
- """
- # Desired configuration.
- desired = {
- "file": os.path.join(RUNTIME_VARS.TMP, "file_with_setuid"),
- "user": user,
- "group": group,
- "mode": "4750",
- }
- # Run the state.
- ret = self.run_state(
- "file.managed",
- name=desired["file"],
- user=desired["user"],
- group=desired["group"],
- mode=desired["mode"],
- )
- # Check result.
- file_stat = os.stat(desired["file"])
- result = {
- "user": pwd.getpwuid(file_stat.st_uid).pw_name,
- "group": grp.getgrgid(file_stat.st_gid).gr_name,
- "mode": oct(stat.S_IMODE(file_stat.st_mode)),
- }
- self.assertSaltTrueReturn(ret)
- self.assertEqual(desired["user"], result["user"])
- self.assertEqual(desired["group"], result["group"])
- self.assertEqual(desired["mode"], result["mode"].lstrip("0Oo"))
- def test_binary_contents(self):
- """
- This tests to ensure that binary contents do not cause a traceback.
- """
- name = os.path.join(RUNTIME_VARS.TMP, "1px.gif")
- try:
- ret = self.run_state("file.managed", name=name, contents=BINARY_FILE)
- self.assertSaltTrueReturn(ret)
- finally:
- try:
- os.remove(name)
- except OSError:
- pass
- def test_binary_contents_twice(self):
- """
- This test ensures that after a binary file is created, salt can confirm
- that the file is in the correct state.
- """
- # Create a binary file
- name = os.path.join(RUNTIME_VARS.TMP, "1px.gif")
- # First run state ensures file is created
- ret = self.run_state("file.managed", name=name, contents=BINARY_FILE)
- self.assertSaltTrueReturn(ret)
- # Second run of state ensures file is in correct state
- ret = self.run_state("file.managed", name=name, contents=BINARY_FILE)
- self.assertSaltTrueReturn(ret)
- try:
- os.remove(name)
- except OSError:
- pass
- @skip_if_not_root
- @skipIf(not HAS_PWD, "pwd not available. Skipping test")
- @skipIf(not HAS_GRP, "grp not available. Skipping test")
- @with_system_user_and_group(
- TEST_SYSTEM_USER, TEST_SYSTEM_GROUP, on_existing="delete", delete=True
- )
- @with_tempdir()
- def test_issue_48336_file_managed_mode_setuid(self, tempdir, user, group):
- """
- Ensure that mode is correct with changing of ownership and group
- symlinks)
- """
- tempfile = os.path.join(tempdir, "temp_file_issue_48336")
- # Run the state
- ret = self.run_state(
- "file.managed", name=tempfile, user=user, group=group, mode="4750",
- )
- self.assertSaltTrueReturn(ret)
- # Check that the owner and group are correct, and
- # the mode is what we expect
- temp_file_stats = os.stat(tempfile)
- # Normalize the mode
- temp_file_mode = str(oct(stat.S_IMODE(temp_file_stats.st_mode)))
- temp_file_mode = salt.utils.files.normalize_mode(temp_file_mode)
- self.assertEqual(temp_file_mode, "4750")
- self.assertEqual(pwd.getpwuid(temp_file_stats.st_uid).pw_name, user)
- self.assertEqual(grp.getgrgid(temp_file_stats.st_gid).gr_name, group)
- @with_tempdir()
- def test_issue_48557(self, tempdir):
- tempfile = os.path.join(tempdir, "temp_file_issue_48557")
- with salt.utils.files.fopen(tempfile, "wb") as fp:
- fp.write(os.linesep.join(["test1", "test2", "test3", ""]).encode("utf-8"))
- ret = self.run_state(
- "file.line", name=tempfile, after="test2", mode="insert", content="test4"
- )
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(tempfile, "rb") as fp:
- content = fp.read()
- self.assertEqual(
- content,
- os.linesep.join(["test1", "test2", "test4", "test3", ""]).encode("utf-8"),
- )
- @with_tempfile()
- def test_issue_50221(self, name):
- expected = "abc{0}{0}{0}".format(os.linesep)
- ret = self.run_function("pillar.get", ["issue-50221"])
- assert ret == expected
- ret = self.run_function("state.apply", ["issue-50221"], pillar={"name": name},)
- self.assertSaltTrueReturn(ret)
- with salt.utils.files.fopen(name, "r") as fp:
- contents = fp.read()
- assert contents == expected
- def test_managed_file_issue_51208(self):
- """
- Test to ensure we can handle a file with escaped double-quotes
- """
- name = os.path.join(RUNTIME_VARS.TMP, "issue_51208.txt")
- ret = self.run_state(
- "file.managed", name=name, source="salt://issue-51208/vimrc.stub"
- )
- src = os.path.join(RUNTIME_VARS.BASE_FILES, "issue-51208", "vimrc.stub")
- with salt.utils.files.fopen(src, "r") as fp_:
- master_data = fp_.read()
- with salt.utils.files.fopen(name, "r") as fp_:
- minion_data = fp_.read()
- self.assertEqual(master_data, minion_data)
- self.assertSaltTrueReturn(ret)
- @with_tempfile()
- def test_keyvalue(self, name):
- """
- file.keyvalue
- """
- content = dedent(
- """\
- # This is the sshd server system-wide configuration file. See
- # sshd_config(5) for more information.
- # The strategy used for options in the default sshd_config shipped with
- # OpenSSH is to specify options with their default value where
- # possible, but leave them commented. Uncommented options override the
- # default value.
- #Port 22
- #AddressFamily any
- #ListenAddress 0.0.0.0
- #ListenAddress ::
- #HostKey /etc/ssh/ssh_host_rsa_key
- #HostKey /etc/ssh/ssh_host_ecdsa_key
- #HostKey /etc/ssh/ssh_host_ed25519_key
- # Ciphers and keying
- #RekeyLimit default none
- # Logging
- #SyslogFacility AUTH
- #LogLevel INFO
- # Authentication:
- #LoginGraceTime 2m
- #PermitRootLogin prohibit-password
- #StrictModes yes
- #MaxAuthTries 6
- #MaxSessions 10
- """
- )
- with salt.utils.files.fopen(name, "w+") as fp_:
- fp_.write(content)
- ret = self.run_state(
- "file.keyvalue",
- name=name,
- key="permitrootlogin",
- value="no",
- separator=" ",
- uncomment=" #",
- key_ignore_case=True,
- )
- with salt.utils.files.fopen(name, "r") as fp_:
- file_contents = fp_.read()
- self.assertNotIn("#PermitRootLogin", file_contents)
- self.assertNotIn("prohibit-password", file_contents)
- self.assertIn("PermitRootLogin no", file_contents)
- self.assertSaltTrueReturn(ret)
- @pytest.mark.windows_whitelisted
- class BlockreplaceTest(ModuleCase, SaltReturnAssertsMixin):
- marker_start = "# start"
- marker_end = "# end"
- content = dedent(
- """\
- Line 1 of block
- Line 2 of block
- """
- )
- without_block = dedent(
- """\
- Hello world!
- # comment here
- """
- )
- with_non_matching_block = dedent(
- """\
- Hello world!
- # start
- No match here
- # end
- # comment here
- """
- )
- with_non_matching_block_and_marker_end_not_after_newline = dedent(
- """\
- Hello world!
- # start
- No match here# end
- # comment here
- """
- )
- with_matching_block = dedent(
- """\
- Hello world!
- # start
- Line 1 of block
- Line 2 of block
- # end
- # comment here
- """
- )
- with_matching_block_and_extra_newline = dedent(
- """\
- Hello world!
- # start
- Line 1 of block
- Line 2 of block
- # end
- # comment here
- """
- )
- with_matching_block_and_marker_end_not_after_newline = dedent(
- """\
- Hello world!
- # start
- Line 1 of block
- Line 2 of block# end
- # comment here
- """
- )
- content_explicit_posix_newlines = "Line 1 of block\n" "Line 2 of block\n"
- content_explicit_windows_newlines = "Line 1 of block\r\n" "Line 2 of block\r\n"
- without_block_explicit_posix_newlines = "Hello world!\n\n" "# comment here\n"
- without_block_explicit_windows_newlines = (
- "Hello world!\r\n\r\n" "# comment here\r\n"
- )
- with_block_prepended_explicit_posix_newlines = (
- "# start\n"
- "Line 1 of block\n"
- "Line 2 of block\n"
- "# end\n"
- "Hello world!\n\n"
- "# comment here\n"
- )
- with_block_prepended_explicit_windows_newlines = (
- "# start\r\n"
- "Line 1 of block\r\n"
- "Line 2 of block\r\n"
- "# end\r\n"
- "Hello world!\r\n\r\n"
- "# comment here\r\n"
- )
- with_block_appended_explicit_posix_newlines = (
- "Hello world!\n\n"
- "# comment here\n"
- "# start\n"
- "Line 1 of block\n"
- "Line 2 of block\n"
- "# end\n"
- )
- with_block_appended_explicit_windows_newlines = (
- "Hello world!\r\n\r\n"
- "# comment here\r\n"
- "# start\r\n"
- "Line 1 of block\r\n"
- "Line 2 of block\r\n"
- "# end\r\n"
- )
- @staticmethod
- def _write(dest, content):
- with salt.utils.files.fopen(dest, "wb") as fp_:
- fp_.write(salt.utils.stringutils.to_bytes(content))
- @staticmethod
- def _read(src):
- with salt.utils.files.fopen(src, "rb") as fp_:
- return salt.utils.stringutils.to_unicode(fp_.read())
- @with_tempfile()
- def test_prepend(self, name):
- """
- Test blockreplace when prepend_if_not_found=True and block doesn't
- exist in file.
- """
- expected = (
- self.marker_start
- + os.linesep
- + self.content
- + self.marker_end
- + os.linesep
- + self.without_block
- )
- # Pass 1: content ends in newline
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_prepend_append_newline(self, name):
- """
- Test blockreplace when prepend_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to True.
- """
- # Pass 1: content ends in newline
- expected = (
- self.marker_start
- + os.linesep
- + self.content
- + os.linesep
- + self.marker_end
- + os.linesep
- + self.without_block
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = (
- self.marker_start
- + os.linesep
- + self.content
- + self.marker_end
- + os.linesep
- + self.without_block
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_prepend_no_append_newline(self, name):
- """
- Test blockreplace when prepend_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to False.
- """
- # Pass 1: content ends in newline
- expected = (
- self.marker_start
- + os.linesep
- + self.content
- + self.marker_end
- + os.linesep
- + self.without_block
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = (
- self.marker_start
- + os.linesep
- + self.content.rstrip("\r\n")
- + self.marker_end
- + os.linesep
- + self.without_block
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_append(self, name):
- """
- Test blockreplace when append_if_not_found=True and block doesn't
- exist in file.
- """
- expected = (
- self.without_block
- + self.marker_start
- + os.linesep
- + self.content
- + self.marker_end
- + os.linesep
- )
- # Pass 1: content ends in newline
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_append_append_newline(self, name):
- """
- Test blockreplace when append_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to True.
- """
- # Pass 1: content ends in newline
- expected = (
- self.without_block
- + self.marker_start
- + os.linesep
- + self.content
- + os.linesep
- + self.marker_end
- + os.linesep
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = (
- self.without_block
- + self.marker_start
- + os.linesep
- + self.content
- + self.marker_end
- + os.linesep
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_append_no_append_newline(self, name):
- """
- Test blockreplace when append_if_not_found=True and block doesn't
- exist in file. Test with append_newline explicitly set to False.
- """
- # Pass 1: content ends in newline
- expected = (
- self.without_block
- + self.marker_start
- + os.linesep
- + self.content
- + self.marker_end
- + os.linesep
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2: content does not end in newline
- expected = (
- self.without_block
- + self.marker_start
- + os.linesep
- + self.content.rstrip("\r\n")
- + self.marker_end
- + os.linesep
- )
- self._write(name, self.without_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), expected)
- @with_tempfile()
- def test_prepend_auto_line_separator(self, name):
- """
- This tests the line separator auto-detection when prepending the block
- """
- # POSIX newlines to Windows newlines
- self._write(name, self.without_block_explicit_windows_newlines)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_prepended_explicit_windows_newlines
- )
- # Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_prepended_explicit_windows_newlines
- )
- # Windows newlines to POSIX newlines
- self._write(name, self.without_block_explicit_posix_newlines)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_prepended_explicit_posix_newlines
- )
- # Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- prepend_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_prepended_explicit_posix_newlines
- )
- @with_tempfile()
- def test_append_auto_line_separator(self, name):
- """
- This tests the line separator auto-detection when appending the block
- """
- # POSIX newlines to Windows newlines
- self._write(name, self.without_block_explicit_windows_newlines)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_appended_explicit_windows_newlines
- )
- # Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_posix_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_appended_explicit_windows_newlines
- )
- # Windows newlines to POSIX newlines
- self._write(name, self.without_block_explicit_posix_newlines)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_appended_explicit_posix_newlines
- )
- # Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content_explicit_windows_newlines,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_if_not_found=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_block_appended_explicit_posix_newlines
- )
- @with_tempfile()
- def test_non_matching_block(self, name):
- """
- Test blockreplace when block exists but its contents are not a
- match.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_append_newline(self, name):
- """
- Test blockreplace when block exists but its contents are not a
- match. Test with append_newline explicitly set to True.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_no_append_newline(self, name):
- """
- Test blockreplace when block exists but its contents are not a
- match. Test with append_newline explicitly set to False.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- @with_tempfile()
- def test_non_matching_block_and_marker_not_after_newline(self, name):
- """
- Test blockreplace when block exists but its contents are not a
- match, and the marker_end is not directly preceded by a newline.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_and_marker_not_after_newline_append_newline(self, name):
- """
- Test blockreplace when block exists but its contents are not a match,
- and the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to True.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_non_matching_block_and_marker_not_after_newline_no_append_newline(
- self, name
- ):
- """
- Test blockreplace when block exists but its contents are not a match,
- and the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to False.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_non_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- @with_tempfile()
- def test_matching_block(self, name):
- """
- Test blockreplace when block exists and its contents are a match. No
- changes should be made.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_append_newline(self, name):
- """
- Test blockreplace when block exists and its contents are a match. Test
- with append_newline explicitly set to True. This will result in an
- extra newline when the content ends in a newline, and will not when the
- content does not end in a newline.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_no_append_newline(self, name):
- """
- Test blockreplace when block exists and its contents are a match. Test
- with append_newline explicitly set to False. This will result in the
- marker_end not being directly preceded by a newline when the content
- does not end in a newline.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- @with_tempfile()
- def test_matching_block_and_marker_not_after_newline(self, name):
- """
- Test blockreplace when block exists and its contents are a match, but
- the marker_end is not directly preceded by a newline.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_and_marker_not_after_newline_append_newline(self, name):
- """
- Test blockreplace when block exists and its contents are a match, but
- the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to True. This will result in an extra
- newline when the content ends in a newline, and will not when the
- content does not end in a newline.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block_and_extra_newline)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=True,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- @with_tempfile()
- def test_matching_block_and_marker_not_after_newline_no_append_newline(self, name):
- """
- Test blockreplace when block exists and its contents are a match, but
- the marker_end is not directly preceded by a newline. Test with
- append_newline explicitly set to False.
- """
- # Pass 1: content ends in newline
- self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertTrue(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 1a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content,
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(self._read(name), self.with_matching_block)
- # Pass 2: content does not end in newline
- self._write(name, self.with_matching_block_and_marker_end_not_after_newline)
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- # Pass 2a: Re-run state, no changes should be made
- ret = self.run_state(
- "file.blockreplace",
- name=name,
- content=self.content.rstrip("\r\n"),
- marker_start=self.marker_start,
- marker_end=self.marker_end,
- append_newline=False,
- )
- self.assertSaltTrueReturn(ret)
- self.assertFalse(ret[next(iter(ret))]["changes"])
- self.assertEqual(
- self._read(name), self.with_matching_block_and_marker_end_not_after_newline
- )
- @with_tempfile()
- def test_issue_49043(self, name):
- ret = self.run_function("state.sls", mods="issue-49043", pillar={"name": name},)
- log.error("ret = %s", repr(ret))
- diff = "--- \n+++ \n@@ -0,0 +1,3 @@\n"
- diff += dedent(
- """\
- +#-- start managed zone --
- +äöü
- +#-- end managed zone --
- """
- )
- job = "file_|-somefile-blockreplace_|-{}_|-blockreplace".format(name)
- self.assertEqual(ret[job]["changes"]["diff"], diff)
- @pytest.mark.windows_whitelisted
- class RemoteFileTest(ModuleCase, SaltReturnAssertsMixin):
- """
- Uses a local tornado webserver to test http(s) file.managed states with and
- without skip_verify
- """
- @classmethod
- def setUpClass(cls):
- cls.webserver = Webserver()
- cls.webserver.start()
- cls.source = cls.webserver.url("grail/scene33")
- if IS_WINDOWS:
- # CRLF vs LF causes a different hash on windows
- cls.source_hash = "21438b3d5fd2c0028bcab92f7824dc69"
- else:
- cls.source_hash = "d2feb3beb323c79fc7a0f44f1408b4a3"
- @classmethod
- def tearDownClass(cls):
- cls.webserver.stop()
- @with_tempfile(create=False)
- def setUp(self, name): # pylint: disable=arguments-differ
- self.name = name
- def tearDown(self):
- try:
- os.remove(self.name)
- except OSError as exc:
- if exc.errno != errno.ENOENT:
- six.reraise(*sys.exc_info())
- def run_state(self, *args, **kwargs): # pylint: disable=arguments-differ
- ret = super(RemoteFileTest, self).run_state(*args, **kwargs)
- log.debug("ret = %s", ret)
- return ret
- def test_file_managed_http_source_no_hash(self):
- """
- Test a remote file with no hash
- """
- ret = self.run_state(
- "file.managed", name=self.name, source=self.source, skip_verify=False
- )
- # This should fail because no hash was provided
- self.assertSaltFalseReturn(ret)
- def test_file_managed_http_source(self):
- """
- Test a remote file with no hash
- """
- ret = self.run_state(
- "file.managed",
- name=self.name,
- source=self.source,
- source_hash=self.source_hash,
- skip_verify=False,
- )
- self.assertSaltTrueReturn(ret)
- def test_file_managed_http_source_skip_verify(self):
- """
- Test a remote file using skip_verify
- """
- ret = self.run_state(
- "file.managed", name=self.name, source=self.source, skip_verify=True
- )
- self.assertSaltTrueReturn(ret)
- def test_file_managed_keep_source_false_http(self):
- """
- This test ensures that we properly clean the cached file if keep_source
- is set to False, for source files using an http:// URL
- """
- # Run the state
- ret = self.run_state(
- "file.managed",
- name=self.name,
- source=self.source,
- source_hash=self.source_hash,
- keep_source=False,
- )
- ret = ret[next(iter(ret))]
- assert ret["result"] is True
- # Now make sure that the file is not cached
- result = self.run_function("cp.is_cached", [self.source])
- assert result == "", "File is still cached at {0}".format(result)
- @skipIf(not salt.utils.path.which("patch"), "patch is not installed")
- @pytest.mark.windows_whitelisted
- class PatchTest(ModuleCase, SaltReturnAssertsMixin):
- def _check_patch_version(self, min_version):
- """
- patch version check
- """
- version = self.run_function("cmd.run", ["patch --version"]).splitlines()[0]
- version = version.split()[1]
- if _LooseVersion(version) < _LooseVersion(min_version):
- self.skipTest(
- "Minimum patch version required: {0}. "
- "Patch version installed: {1}".format(min_version, version)
- )
- @classmethod
- def setUpClass(cls):
- cls.webserver = Webserver()
- cls.webserver.start()
- cls.numbers_patch_name = "numbers.patch"
- cls.math_patch_name = "math.patch"
- cls.all_patch_name = "all.patch"
- cls.numbers_patch_template_name = cls.numbers_patch_name + ".jinja"
- cls.math_patch_template_name = cls.math_patch_name + ".jinja"
- cls.all_patch_template_name = cls.all_patch_name + ".jinja"
- cls.numbers_patch_path = "patches/" + cls.numbers_patch_name
- cls.math_patch_path = "patches/" + cls.math_patch_name
- cls.all_patch_path = "patches/" + cls.all_patch_name
- cls.numbers_patch_template_path = "patches/" + cls.numbers_patch_template_name
- cls.math_patch_template_path = "patches/" + cls.math_patch_template_name
- cls.all_patch_template_path = "patches/" + cls.all_patch_template_name
- cls.numbers_patch = "salt://" + cls.numbers_patch_path
- cls.math_patch = "salt://" + cls.math_patch_path
- cls.all_patch = "salt://" + cls.all_patch_path
- cls.numbers_patch_template = "salt://" + cls.numbers_patch_template_path
- cls.math_patch_template = "salt://" + cls.math_patch_template_path
- cls.all_patch_template = "salt://" + cls.all_patch_template_path
- cls.numbers_patch_http = cls.webserver.url(cls.numbers_patch_path)
- cls.math_patch_http = cls.webserver.url(cls.math_patch_path)
- cls.all_patch_http = cls.webserver.url(cls.all_patch_path)
- cls.numbers_patch_template_http = cls.webserver.url(
- cls.numbers_patch_template_path
- )
- cls.math_patch_template_http = cls.webserver.url(cls.math_patch_template_path)
- cls.all_patch_template_http = cls.webserver.url(cls.all_patch_template_path)
- patches_dir = os.path.join(RUNTIME_VARS.FILES, "file", "base", "patches")
- cls.numbers_patch_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.numbers_patch_name)
- )
- cls.math_patch_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.math_patch_name)
- )
- cls.all_patch_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.all_patch_name)
- )
- cls.numbers_patch_template_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.numbers_patch_template_name)
- )
- cls.math_patch_template_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.math_patch_template_name)
- )
- cls.all_patch_template_hash = salt.utils.hashutils.get_hash(
- os.path.join(patches_dir, cls.all_patch_template_name)
- )
- cls.context = {"two": "two", "ten": 10}
- @classmethod
- def tearDownClass(cls):
- cls.webserver.stop()
- def setUp(self):
- """
- Create a new unpatched set of files
- """
- self.base_dir = tempfile.mkdtemp(dir=RUNTIME_VARS.TMP)
- os.makedirs(os.path.join(self.base_dir, "foo", "bar"))
- self.numbers_file = os.path.join(self.base_dir, "foo", "numbers.txt")
- self.math_file = os.path.join(self.base_dir, "foo", "bar", "math.txt")
- with salt.utils.files.fopen(self.numbers_file, "w") as fp_:
- fp_.write(
- textwrap.dedent(
- """\
- one
- two
- three
- 1
- 2
- 3
- """
- )
- )
- with salt.utils.files.fopen(self.math_file, "w") as fp_:
- fp_.write(
- textwrap.dedent(
- """\
- Five plus five is ten
- Four squared is sixteen
- """
- )
- )
- self.addCleanup(shutil.rmtree, self.base_dir, ignore_errors=True)
- def test_patch_single_file(self):
- """
- Test file.patch using a patch applied to a single file
- """
- ret = self.run_state(
- "file.patch", name=self.numbers_file, source=self.numbers_patch,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- "file.patch", name=self.numbers_file, source=self.numbers_patch,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_directory(self):
- """
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files.
- """
- self._check_patch_version("2.6")
- ret = self.run_state(
- "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_strip_parsing(self):
- """
- Test that we successfuly parse -p/--strip when included in the options
- """
- self._check_patch_version("2.6")
- # Run the state using -p1
- ret = self.run_state(
- "file.patch", name=self.base_dir, source=self.all_patch, options="-p1",
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run the state using --strip=1
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch,
- options="--strip=1",
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- # Re-run the state using --strip 1
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch,
- options="--strip 1",
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_saltenv(self):
- """
- Test that we attempt to download the patch from a non-base saltenv
- """
- # This state will fail because we don't have a patch file in that
- # environment, but that is OK, we just want to test that we're looking
- # in an environment other than base.
- ret = self.run_state(
- "file.patch", name=self.math_file, source=self.math_patch, saltenv="prod",
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(
- ret["comment"],
- "Source file {0} not found in saltenv 'prod'".format(self.math_patch),
- )
- def test_patch_single_file_failure(self):
- """
- Test file.patch using a patch applied to a single file. This tests a
- failed patch.
- """
- # Empty the file to ensure that the patch doesn't apply cleanly
- with salt.utils.files.fopen(self.numbers_file, "w"):
- pass
- ret = self.run_state(
- "file.patch", name=self.numbers_file, source=self.numbers_patch,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Patch would not apply cleanly", ret["comment"])
- # Test the reject_file option and ensure that the rejects are written
- # to the path specified.
- reject_file = salt.utils.files.mkstemp()
- ret = self.run_state(
- "file.patch",
- name=self.numbers_file,
- source=self.numbers_patch,
- reject_file=reject_file,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Patch would not apply cleanly", ret["comment"])
- self.assertRegex(
- ret["comment"], "saving rejects to (file )?{0}".format(reject_file)
- )
- def test_patch_directory_failure(self):
- """
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files.
- """
- # Empty the file to ensure that the patch doesn't apply
- with salt.utils.files.fopen(self.math_file, "w"):
- pass
- ret = self.run_state(
- "file.patch", name=self.base_dir, source=self.all_patch, strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Patch would not apply cleanly", ret["comment"])
- # Test the reject_file option and ensure that the rejects are written
- # to the path specified.
- reject_file = salt.utils.files.mkstemp()
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch,
- reject_file=reject_file,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Patch would not apply cleanly", ret["comment"])
- self.assertRegex(
- ret["comment"], "saving rejects to (file )?{0}".format(reject_file)
- )
- def test_patch_single_file_remote_source(self):
- """
- Test file.patch using a patch applied to a single file, with the patch
- coming from a remote source.
- """
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- "file.patch", name=self.math_file, source=self.math_patch_http,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Unable to verify upstream hash", ret["comment"])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- "file.patch",
- name=self.math_file,
- source=self.math_patch_http,
- source_hash=self.math_patch_hash,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- "file.patch",
- name=self.math_file,
- source=self.math_patch_http,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_directory_remote_source(self):
- """
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files, and the patch file coming from a remote
- source.
- """
- self._check_patch_version("2.6")
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- "file.patch", name=self.base_dir, source=self.all_patch_http, strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Unable to verify upstream hash", ret["comment"])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch_http,
- source_hash=self.all_patch_hash,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch_http,
- strip=1,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_single_file_template(self):
- """
- Test file.patch using a patch applied to a single file, with jinja
- templating applied to the patch file.
- """
- ret = self.run_state(
- "file.patch",
- name=self.numbers_file,
- source=self.numbers_patch_template,
- template="jinja",
- context=self.context,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- "file.patch",
- name=self.numbers_file,
- source=self.numbers_patch_template,
- template="jinja",
- context=self.context,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_directory_template(self):
- """
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files, and with jinja templating applied to the patch
- file.
- """
- self._check_patch_version("2.6")
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch_template,
- template="jinja",
- context=self.context,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run the state, should succeed and there should be a message about
- # a partially-applied hunk.
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch_template,
- template="jinja",
- context=self.context,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_single_file_remote_source_template(self):
- """
- Test file.patch using a patch applied to a single file, with the patch
- coming from a remote source.
- """
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- "file.patch",
- name=self.math_file,
- source=self.math_patch_template_http,
- template="jinja",
- context=self.context,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Unable to verify upstream hash", ret["comment"])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- "file.patch",
- name=self.math_file,
- source=self.math_patch_template_http,
- source_hash=self.math_patch_template_hash,
- template="jinja",
- context=self.context,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- "file.patch",
- name=self.math_file,
- source=self.math_patch_template_http,
- template="jinja",
- context=self.context,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_directory_remote_source_template(self):
- """
- Test file.patch using a patch applied to a directory, with changes
- spanning multiple files, and the patch file coming from a remote
- source.
- """
- self._check_patch_version("2.6")
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch_template_http,
- template="jinja",
- context=self.context,
- strip=1,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Unable to verify upstream hash", ret["comment"])
- # Re-run the state with a source hash, it should now succeed
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch_template_http,
- source_hash=self.all_patch_template_hash,
- template="jinja",
- context=self.context,
- strip=1,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- # Re-run again, this time with no hash and skip_verify=True to test
- # skipping hash verification
- ret = self.run_state(
- "file.patch",
- name=self.base_dir,
- source=self.all_patch_template_http,
- template="jinja",
- context=self.context,
- strip=1,
- skip_verify=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- def test_patch_test_mode(self):
- """
- Test file.patch using test=True
- """
- # Try without a source_hash and without skip_verify=True, this should
- # fail with a message about the source_hash
- ret = self.run_state(
- "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
- )
- self.assertSaltNoneReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "The patch would be applied")
- self.assertTrue(ret["changes"])
- # Apply the patch for real. We'll then be able to test below that we
- # exit with a True rather than a None result if test=True is used on an
- # already-applied patch.
- ret = self.run_state(
- "file.patch", name=self.numbers_file, source=self.numbers_patch,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch successfully applied")
- self.assertTrue(ret["changes"])
- # Run again with test=True. Since the pre-check happens before we do
- # the __opts__['test'] check, we should exit with a True result just
- # the same as if we try to run this state on an already-patched file
- # *without* test=True.
- ret = self.run_state(
- "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
- )
- self.assertSaltTrueReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertEqual(ret["comment"], "Patch was already applied")
- self.assertEqual(ret["changes"], {})
- # Empty the file to ensure that the patch doesn't apply cleanly
- with salt.utils.files.fopen(self.numbers_file, "w"):
- pass
- # Run again with test=True. Similar to the above run, we are testing
- # that we return before we reach the __opts__['test'] check. In this
- # case we should return a False result because we should already know
- # by this point that the patch will not apply cleanly.
- ret = self.run_state(
- "file.patch", name=self.numbers_file, source=self.numbers_patch, test=True,
- )
- self.assertSaltFalseReturn(ret)
- ret = ret[next(iter(ret))]
- self.assertIn("Patch would not apply cleanly", ret["comment"])
- self.assertEqual(ret["changes"], {})
- WIN_TEST_FILE = "c:/testfile"
- @destructiveTest
- @skipIf(not IS_WINDOWS, "windows test only")
- @pytest.mark.windows_whitelisted
- class WinFileTest(ModuleCase):
- """
- Test for the file state on Windows
- """
- def setUp(self):
- self.run_state(
- "file.managed", name=WIN_TEST_FILE, makedirs=True, contents="Only a test"
- )
- def tearDown(self):
- self.run_state("file.absent", name=WIN_TEST_FILE)
- def test_file_managed(self):
- """
- Test file.managed on Windows
- """
- self.assertTrue(self.run_state("file.exists", name=WIN_TEST_FILE))
- def test_file_copy(self):
- """
- Test file.copy on Windows
- """
- ret = self.run_state(
- "file.copy", name="c:/testfile_copy", makedirs=True, source=WIN_TEST_FILE
- )
- self.assertTrue(ret)
- def test_file_comment(self):
- """
- Test file.comment on Windows
- """
- self.run_state("file.comment", name=WIN_TEST_FILE, regex="^Only")
- with salt.utils.files.fopen(WIN_TEST_FILE, "r") as fp_:
- self.assertTrue(fp_.read().startswith("#Only"))
- def test_file_replace(self):
- """
- Test file.replace on Windows
- """
- self.run_state(
- "file.replace", name=WIN_TEST_FILE, pattern="test", repl="testing"
- )
- with salt.utils.files.fopen(WIN_TEST_FILE, "r") as fp_:
- self.assertIn("testing", fp_.read())
- def test_file_absent(self):
- """
- Test file.absent on Windows
- """
- ret = self.run_state("file.absent", name=WIN_TEST_FILE)
- self.assertTrue(ret)
|